baeldung 2016-02-23
1. 概述
ExecutorService 是 JDK 提供的一个 API,用于简化异步任务的执行。通常来说,ExecutorService 会自动提供一个线程池,并提供一组 API 来将任务提交给该线程池。
2. 创建 ExecutorService 实例
2.1 Executors 类的工厂方法
创建 ExecutorService 最简单的方式是使用 Executors 类提供的工厂方法。
例如,下面这行代码会创建一个包含 10 个线程的线程池:
ExecutorService executor = Executors.newFixedThreadPool(10);
此外还有其他多种工厂方法,可用于创建满足特定使用场景的预定义 ExecutorService。要选择最适合你需求的方法,请参考 Oracle 的官方文档。
2.2 直接创建 ExecutorService
由于 ExecutorService 是一个接口,因此可以使用其任意实现类的实例。java.util.concurrent 包中提供了多个实现类,你也可以自行实现。
例如,ThreadPoolExecutor 类提供了多个构造函数,可用于配置执行器服务及其内部线程池:
ExecutorService executorService =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
你会发现上述代码与 newSingleThreadExecutor() 工厂方法的源码非常相似。在大多数情况下,并不需要进行如此详细的自定义配置。
3. 向 ExecutorService 提交任务
ExecutorService 可以执行 Runnable 和 Callable 类型的任务。为简化说明,本文使用两个简单的任务示例。注意,这里我们使用 Lambda 表达式而非匿名内部类:
Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};
List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
我们可以使用多种方法将任务提交给 ExecutorService,包括从 Executor 接口继承的 execute() 方法,以及 submit()、invokeAny() 和 invokeAll() 方法。
execute()方法返回类型为void,无法获取任务执行结果,也无法检查任务状态(例如是否正在运行):
executorService.execute(runnableTask);
submit()方法可提交Callable或Runnable任务,并返回一个Future类型的结果:
Future<String> future = executorService.submit(callableTask);
invokeAny()方法将一组任务提交给ExecutorService并行执行,并返回任意一个成功完成任务的结果(如果有成功执行的任务):
String result = executorService.invokeAny(callableTasks);
invokeAll()方法将一组任务提交给ExecutorService并行执行,并返回所有任务执行结果,以Future对象列表的形式返回:
List<Future<String>> futures = executorService.invokeAll(callableTasks);
在继续深入之前,我们需要讨论另外两个重要主题:如何关闭 ExecutorService,以及如何处理 Future 返回类型。
4. 关闭 ExecutorService
通常情况下,当没有待处理任务时,ExecutorService 不会自动销毁。它会保持运行状态,等待新的任务到来。
在某些场景下,这种行为非常有用,比如应用程序需要处理不定期出现的任务,或者编译时无法确定任务数量。
但另一方面,如果应用程序已到达终点却仍未终止,可能是因为仍在等待的 ExecutorService 导致 JVM 无法退出。
为了正确关闭 ExecutorService,JDK 提供了 shutdown() 和 shutdownNow() 两种方法:
shutdown()方法不会立即销毁ExecutorService。它会使ExecutorService停止接收新任务,并在所有正在运行的线程完成当前工作后关闭:
executorService.shutdown();
shutdownNow()方法尝试立即销毁ExecutorService,但不能保证所有正在运行的线程都会立刻停止:
List<Runnable> notExecutedTasks = executorService.shutdownNow();
该方法会返回一个尚未执行的任务列表,开发者需自行决定如何处理这些任务。
Oracle 官方推荐的一种良好实践是结合使用 shutdown()、shutdownNow() 与 awaitTermination() 方法:
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
采用这种方式,ExecutorService 首先停止接收新任务,然后最多等待指定时间(本例中为 800 毫秒)让所有任务完成。若超时,则立即强制终止。
5. Future 接口
submit() 和 invokeAll() 方法会返回一个或一组 Future 类型的对象,通过它们可以获取任务执行结果或检查任务状态(例如是否仍在运行)。
Future 接口提供了一个特殊的阻塞方法 get(),用于获取 Callable 任务的实际执行结果;对于 Runnable 任务则返回 null:
Future<String> future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
如果在任务仍在运行时调用 get() 方法,程序会阻塞,直到任务完成并返回结果。
长时间阻塞可能导致应用程序性能下降。如果结果并非关键数据,可以通过设置超时来避免此问题:
String result = future.get(200, TimeUnit.MILLISECONDS);
如果任务执行时间超过指定时限(本例为 200 毫秒),将抛出 TimeoutException。
我们还可以使用 isDone() 方法检查任务是否已完成。
Future 接口还支持通过 cancel() 方法取消任务执行,并通过 isCancelled() 方法检查任务是否已被取消:
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();
6. ScheduledExecutorService 接口
ScheduledExecutorService 可用于在指定延迟后执行任务,或周期性地重复执行任务。
实例化 ScheduledExecutorService 的最佳方式仍是使用 Executors 类的工厂方法。
本节中,我们使用一个单线程的 ScheduledExecutorService:
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
- 要在固定延迟后执行一次任务,可使用
ScheduledExecutorService的schedule()方法。该方法有两个重载版本,分别支持Runnable和Callable任务:
Future<String> resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
上述代码会在 1 秒后执行 callableTask。
scheduleAtFixedRate()方法允许我们以固定频率周期性地执行任务。以下代码将在初始延迟 100 毫秒后首次执行任务,之后每隔 450 毫秒重复执行一次:
executorService.scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);
需要注意的是,如果处理器执行任务所需的时间超过了 scheduleAtFixedRate() 中指定的周期时间,ScheduledExecutorService 会等待当前任务完成后才开始下一次执行。
- 如果希望确保每次任务执行之间有固定的间隔(即从前一次任务结束到下一次任务开始之间的延迟),应使用
scheduleWithFixedDelay()方法。
例如,以下代码保证在当前任务结束后,再等待 150 毫秒才开始下一次执行:
executorService.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);
根据 scheduleAtFixedRate() 和 scheduleWithFixedDelay() 方法的契约,任务的周期性执行将在 ExecutorService 终止或任务执行过程中抛出异常时结束。
7. ExecutorService 与 Fork/Join 框架对比
Java 7 发布后,许多开发者选择用 Fork/Join 框架替代 ExecutorService。
然而,这种做法并不总是正确的。尽管 Fork/Join 框架更简洁且在某些场景下性能更优,但它减少了开发者对并发执行过程的控制能力。
ExecutorService 允许开发者精确控制生成的线程数量,以及分配给各线程的任务粒度。它最适合用于处理彼此独立的任务,例如按照“一个任务对应一个线程”的模式处理事务或请求。
相比之下,根据 Oracle 官方文档,Fork/Join 框架专为可递归拆分为更小子任务的工作负载而设计,旨在加速这类任务的执行。
8. 总结
尽管 ExecutorService 相对简单易用,但仍存在一些常见陷阱。以下是关键注意事项总结:
- 未使用的 ExecutorService 保持运行:参见第 4 节,了解如何正确关闭
ExecutorService。 - 固定大小线程池容量设置不当:合理评估应用程序所需的线程数量至关重要。线程池过大将导致不必要的线程创建开销(多数线程处于等待状态);过小则可能导致任务排队时间过长,使应用显得无响应。
- 在任务被取消后调用 Future 的 get() 方法:尝试获取已取消任务的结果会抛出
CancellationException。 - Future 的 get() 方法意外长时间阻塞:应始终使用带超时的
get()方法,以避免不可预期的等待。