baeldung 2024-05-11
1. 概述
ExecutorService 框架使我们在多个线程中处理任务变得非常简单。本文将通过一些示例场景,展示如何等待线程完成其执行。
此外,我们还将演示如何优雅地关闭 ExecutorService,并等待正在运行的线程完成其任务。
2. 在 Executor 关闭之后
使用 Executor 时,我们可以通过调用 shutdown() 或 shutdownNow() 方法来关闭它。然而,这些方法并不会等待所有线程停止执行。
要等待现有线程完成其执行,可以使用 awaitTermination() 方法。该方法会阻塞当前线程,直到所有任务完成执行,或达到指定的超时时间:
public void awaitTerminationAfterShutdown(ExecutorService threadPool) {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException ex) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
3. 使用 CountDownLatch
接下来,我们来看另一种解决此问题的方法——使用 CountDownLatch 来通知任务已完成。
我们可以将其初始化为一个数值,该数值表示在所有调用了 await() 方法的线程被唤醒之前,可以调用 countDown() 的次数。
例如,如果我们希望当前线程等待另外 N 个线程完成执行,就可以用 N 初始化这个计数器:
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// ...
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 等待两个线程将计数器减至 0
latch.await();
4. 使用 invokeAll()
运行多线程任务的第一种方法是使用 invokeAll() 方法。该方法会在所有任务完成或超时后返回一个 Future 对象列表。
需要注意的是,返回的 Future 对象顺序与所提供的 Callable 对象列表顺序一致:
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);
List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000)
);
long startProcessingTime = System.currentTimeMillis();
List<Future<String>> futures = WORKER_THREAD_POOL.invokeAll(callables);
awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
assertTrue(totalProcessingTime >= 3000);
String firstThreadResponse = futures.get(0).get();
assertTrue("fast thread".equals(firstThreadResponse));
String secondThreadResponse = futures.get(1).get();
assertTrue("slow thread".equals(secondThreadResponse));
5. 使用 ExecutorCompletionService
另一种运行多个线程的方式是使用 ExecutorCompletionService。它使用提供的 ExecutorService 来执行任务。
与 invokeAll() 的一个主要区别在于:ExecutorCompletionService 返回的 Future(代表已执行任务的结果)顺序是按任务完成的先后顺序排列的;而 invokeAll() 返回的列表顺序则与传入的任务列表顺序一致:
CompletionService<String> service
= new ExecutorCompletionService<>(WORKER_THREAD_POOL);
List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000)
);
for (Callable<String> callable : callables) {
service.submit(callable);
}
可以通过 take() 方法获取结果:
long startProcessingTime = System.currentTimeMillis();
Future<String> future = service.take();
String firstThreadResponse = future.get();
long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
assertTrue("First response should be from the fast thread",
"fast thread".equals(firstThreadResponse));
assertTrue(totalProcessingTime >= 100 && totalProcessingTime < 1000);
LOG.debug("Thread finished after: " + totalProcessingTime + " milliseconds");
future = service.take();
String secondThreadResponse = future.get();
totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
assertTrue("Last response should be from the slow thread",
"slow thread".equals(secondThreadResponse));
assertTrue(totalProcessingTime >= 3000 && totalProcessingTime < 4000);
LOG.debug("Thread finished after: " + totalProcessingTime + " milliseconds");
awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
6. 结论
根据不同的使用场景,我们有多种方式可以等待线程完成其执行:
CountDownLatch适用于需要通知一个或多个线程:其他线程所执行的一组操作已经完成。ExecutorCompletionService适用于需要尽快获取任务结果的场景。- 其他方法(如
invokeAll()和awaitTermination())则更适合需要等待所有正在运行的任务全部完成的情况。