Java 中的线程池简介

更新于 2025-12-29

Eugen Paraschiv 2024-06-11

1. 概述

本教程将介绍 Java 中的线程池。我们将首先了解标准 Java 库中提供的不同实现,然后探讨 Google 的 Guava 库中的相关功能。

2. 线程池

在 Java 中,线程映射到操作系统级别的线程,而这些线程是操作系统的资源。如果我们无限制地创建线程,很快就会耗尽这些资源。

操作系统还需要在线程之间进行上下文切换,以模拟并行执行。一个简化的观点是:我们创建的线程越多,每个线程实际用于工作的有效时间就越少。

线程池模式有助于在多线程应用程序中节省资源,并将并行度控制在预定义的范围内。

使用线程池时,我们将并发代码编写为并行任务,并将其提交给线程池实例执行。该实例会管理一组可重用的线程来执行这些任务。

该模式使我们能够控制应用程序创建的线程数量及其生命周期。我们还可以调度任务的执行,并将传入的任务放入队列中等待处理。

3. Java 中的线程池

3.1. Executors、Executor 和 ExecutorService

Executors 工具类包含多个用于创建预配置线程池实例的方法。这些方法是入门的好选择,尤其当我们不需要进行任何自定义微调时。

我们通常使用 ExecutorExecutorService 接口来操作 Java 中不同的线程池实现。一般建议将代码与具体的线程池实现解耦,在整个应用程序中始终使用这些接口。

3.1.1. Executor

Executor 接口只有一个 execute 方法,用于提交 Runnable 实例以供执行。

下面是一个快速示例,展示如何使用 Executors API 获取一个由单线程池和无界队列支持的 Executor 实例,以顺序执行任务:

Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));

在此示例中,我们运行一个仅在屏幕上打印 “Hello World” 的简单任务。我们以 Lambda 表达式(Java 8 特性)的形式提交任务,编译器会将其推断为 Runnable

3.1.2. ExecutorService

ExecutorService 接口提供了大量方法,用于控制任务进度和管理服务终止。通过该接口,我们可以提交任务执行,并通过返回的 Future 实例控制其执行。

下面的例子中,我们创建一个 ExecutorService,提交一个任务,然后使用返回的 Futureget() 方法等待任务完成并获取返回值:

ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> "Hello World");
// 执行其他操作
String result = future.get();

当然,在实际场景中,我们通常不会立即调用 future.get(),而是等到真正需要计算结果时再调用。

这里 submit 方法被重载,可以接收 RunnableCallable。这两个都是函数式接口,从 Java 8 开始,我们可以将它们作为 Lambda 表达式传递。

  • Runnable 的单一方法不抛出异常,也不返回值。
  • Callable 接口可能更方便,因为它允许抛出异常并返回值。

为了让编译器推断出 Callable 类型,只需在 Lambda 表达式中返回一个值即可。

有关 ExecutorService 接口和 Future 的更多使用示例,请参阅《Java ExecutorService 指南》。

3.2. ThreadPoolExecutor

ThreadPoolExecutor 是一个可扩展的线程池实现,具有大量参数和钩子(hooks),可用于精细调优。

本文主要讨论的核心配置参数包括:corePoolSizemaximumPoolSizekeepAliveTime

线程池由一定数量的核心线程组成,这些线程会一直保留在池中。此外,还可能临时创建一些额外线程,当不再需要时会被终止。

  • corePoolSize:池中始终保持的核心线程数。当新任务到来时,如果所有核心线程都处于忙碌状态且内部队列已满,则允许池扩展至 maximumPoolSize
  • maximumPoolSize:线程池允许的最大线程数。
  • keepAliveTime:超出 corePoolSize 的额外线程在空闲状态下可存活的时间。默认情况下,ThreadPoolExecutor 仅考虑非核心线程进行回收。若希望对核心线程也应用相同的超时策略,可调用 allowCoreThreadTimeOut(true) 方法。

这些参数覆盖了广泛的使用场景,但最常见的配置已在 Executors 的静态方法中预定义。

3.2.1. newFixedThreadPool

下面是一个示例。newFixedThreadPool 方法创建一个 ThreadPoolExecutor,其 corePoolSizemaximumPoolSize 相等,keepAliveTime 为 0。这意味着线程池中的线程数量始终固定:

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());

这里我们创建了一个固定大小为 2 的线程池。这意味着,如果同时运行的任务数不超过 2 个,它们会立即执行;否则,部分任务将被放入队列等待。

我们创建了三个 Callable 任务,每个任务通过休眠 1000 毫秒来模拟繁重工作。前两个任务会立即执行,第三个任务则需排队等待。提交任务后立即调用 getPoolSize()getQueue().size() 即可验证这一点。

3.2.2. Executors.newCachedThreadPool()

我们还可以通过 Executors.newCachedThreadPool() 方法创建另一种预配置的 ThreadPoolExecutor。该方法不接收线程数量参数,而是将 corePoolSize 设为 0,maximumPoolSize 设为 Integer.MAX_VALUEkeepAliveTime 为 60 秒:

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(3, executor.getPoolSize());
assertEquals(0, executor.getQueue().size());

这些参数意味着缓存线程池可以根据需要无限制地增长以容纳任意数量的任务。但当线程不再需要时,会在空闲 60 秒后被回收。典型使用场景是应用程序中有大量短生命周期的任务。

队列大小始终为 0,因为内部使用的是 SynchronousQueue。在 SynchronousQueue 中,插入和移除操作总是成对同步发生,因此队列实际上从不包含任何元素。

3.2.3. Executors.newSingleThreadExecutor()

Executors.newSingleThreadExecutor() API 创建另一种典型的 ThreadPoolExecutor,只包含一个线程。单线程执行器非常适合创建事件循环。其 corePoolSizemaximumPoolSize 均为 1,keepAliveTime 为 0。

上述示例中的任务将按顺序执行,因此任务完成后 counter 的值将为 2:

AtomicInteger counter = new AtomicInteger();

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    counter.set(1);
});
executor.submit(() -> {
    counter.compareAndSet(1, 2);
});

此外,此 ThreadPoolExecutor 被一个不可变包装器装饰,因此创建后无法重新配置。这也是我们无法将其强制转换为 ThreadPoolExecutor 的原因。

3.3. ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor,并实现了 ScheduledExecutorService 接口,提供了若干额外方法:

  • schedule:允许我们在指定延迟后一次性执行任务。
  • scheduleAtFixedRate:在指定初始延迟后,以固定周期重复执行任务。周期是指任务开始时间之间的间隔,因此执行频率是固定的。
  • scheduleWithFixedDelay:与 scheduleAtFixedRate 类似,但指定的延迟是从上一个任务结束下一个任务开始之间的时间。因此,执行频率可能因任务执行时间不同而变化。

通常,我们使用 Executors.newScheduledThreadPool() 方法创建 ScheduledThreadPoolExecutor,其 corePoolSize 由用户指定,maximumPoolSize 为无界,keepAliveTime 为 0。

以下代码演示如何在 500 毫秒后调度一个任务执行:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
    System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);

以下代码展示如何在 500 毫秒延迟后启动任务,并每隔 100 毫秒重复执行一次。调度任务后,我们使用 CountDownLatch 等待任务触发三次,然后通过 Future.cancel() 取消它:

CountDownLatch lock = new CountDownLatch(3);

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
    System.out.println("Hello World");
    lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);

lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);

3.4. ForkJoinPool

ForkJoinPool 是 Java 7 引入的 fork/join 框架的核心组件,用于解决递归算法中频繁生成多个子任务导致线程迅速耗尽的问题。若使用普通 ThreadPoolExecutor,每个任务或子任务都需要自己的线程,极易耗尽系统资源。

在 fork/join 框架中,任何任务都可以派生(fork)多个子任务,并通过 join 方法等待它们完成。该框架的优势在于不会为每个任务或子任务创建新线程,而是采用工作窃取算法(work-stealing algorithm)。关于该框架的详细说明,请参阅《Java Fork/Join 框架指南》。

下面是一个使用 ForkJoinPool 遍历节点树并计算所有叶节点值之和的简单示例。首先定义一个简单的树结构,包含一个整数值和一组子节点:

static class TreeNode {
    int value;
    Set<TreeNode> children;

    TreeNode(int value, TreeNode... children) {
        this.value = value;
        this.children = Sets.newHashSet(children);
    }
}

若要并行计算树中所有节点值的总和,需实现 RecursiveTask<Integer> 接口。每个任务接收一个节点,并将其值加上所有子节点值的总和。具体实现如下:

  • 对子节点集合进行流式处理;
  • 为每个子节点创建一个新的 CountingTask
  • 通过 fork() 并行执行每个子任务;
  • 通过 join() 收集各子任务的结果;
  • 使用 Collectors.summingInt 汇总结果。
public static class CountingTask extends RecursiveTask<Integer> {
    private final TreeNode node;

    public CountingTask(TreeNode node) {
        this.node = node;
    }

    @Override
    protected Integer compute() {
        return node.value + node.children.stream()
          .map(childNode -> new CountingTask(childNode).fork())
          .collect(Collectors.summingInt(ForkJoinTask::join));
    }
}

在实际树上运行该计算的代码非常简洁:

TreeNode tree = new TreeNode(5,
  new TreeNode(3), new TreeNode(2,
    new TreeNode(2), new TreeNode(8)));

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));

4. Guava 库中的线程池实现

Guava 是 Google 提供的一个广受欢迎的工具库,包含许多实用的并发类,其中包括几种便捷的 ExecutorService 实现。这些实现类不允许直接实例化或继承,因此创建其实例的唯一入口是 MoreExecutors 工具类。

4.1. 将 Guava 添加为 Maven 依赖

在项目的 pom.xml 文件中添加以下依赖项即可引入 Guava 库。最新版本请参考 Maven Central Repository

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>33.2.1-jre</version>
</dependency>

4.2. Direct Executor 和 Direct Executor Service

有时我们希望根据某些条件,决定在当前线程还是在线程池中执行任务。理想情况下,我们希望使用统一的 Executor 接口,仅切换其实现。虽然自己实现一个在当前线程中执行任务的 ExecutorExecutorService 并不困难,但仍需编写一些样板代码。

幸运的是,Guava 提供了预定义的实例。

以下示例演示了在当前线程中执行任务。尽管任务休眠了 500 毫秒,但它会阻塞当前线程,因此 execute 调用结束后结果立即可用:

Executor executor = MoreExecutors.directExecutor();

AtomicBoolean executed = new AtomicBoolean();

executor.execute(() -> {
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executed.set(true);
});

assertTrue(executed.get());

directExecutor() 返回的实例实际上是静态单例,因此调用该方法不会产生任何对象创建开销。

我们应优先使用 MoreExecutors.directExecutor(),而不是 MoreExecutors.newDirectExecutorService(),因为后者每次调用都会创建一个完整的 ExecutorService 实现。

4.3. 退出型执行器服务(Exiting Executor Services)

另一个常见问题是:当线程池仍在运行任务时关闭虚拟机。即使有取消机制,也无法保证任务会“友好”地在执行器关闭时停止工作,这可能导致 JVM 无限期挂起。

为解决此问题,Guava 引入了一类退出型执行器服务(exiting executor services)。它们基于守护线程(daemon threads),会随 JVM 一同终止。

这些服务还会通过 Runtime.getRuntime().addShutdownHook() 添加一个关闭钩子(shutdown hook),并在 VM 终止前等待指定时间,之后放弃仍在运行的任务。

以下示例中,我们提交一个包含无限循环的任务,但使用一个配置了 100 毫秒等待时间的退出型执行器服务:

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
ExecutorService executorService = 
  MoreExecutors.getExitingExecutorService(executor, 
    100, TimeUnit.MILLISECONDS);

executorService.submit(() -> {
    while (true) {
    }
});

如果没有使用 exitingExecutorService,该任务将导致 VM 无限期挂起。

4.4. 可监听装饰器(Listening Decorators)

可监听装饰器允许我们包装 ExecutorService,在提交任务时返回 ListenableFuture 实例,而非普通的 Future

ListenableFuture 接口扩展了 Future,并增加了一个 addListener 方法,允许注册在 Future 完成时被调用的监听器。

我们很少直接使用 ListenableFuture.addListener(),但它对 Futures 工具类中的大多数辅助方法至关重要。

例如,使用 Futures.allAsList() 方法,我们可以将多个 ListenableFuture 合并为一个 ListenableFuture,该 Future 在所有输入 Future 成功完成后才完成:

ExecutorService executorService = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService = 
  MoreExecutors.listeningDecorator(executorService);

ListenableFuture<String> future1 = 
  listeningExecutorService.submit(() -> "Hello");
ListenableFuture<String> future2 = 
  listeningExecutorService.submit(() -> "World");

String greeting = Futures.allAsList(future1, future2).get()
  .stream()
  .collect(Collectors.joining(" "));
assertEquals("Hello World", greeting);

5. 结论

本文讨论了线程池模式及其在标准 Java 库和 Google Guava 库中的实现。