baeldung 2024-01-09
1. 概述
在本教程中,我们将学习 Future 接口。该接口自 Java 1.5 起就已存在,在处理异步调用和并发处理时非常有用。
2. 创建 Future
简而言之,Future 类表示一个异步计算的未来结果。当计算完成后,结果最终会出现在 Future 中。
接下来,我们来看看如何编写创建并返回 Future 实例的方法。
长时间运行的方法非常适合进行异步处理,并使用 Future 接口,因为我们可以一边等待 Future 中封装的任务完成,一边执行其他操作。
以下是一些适合利用 Future 异步特性的操作示例:
- 计算密集型任务(数学和科学计算)
- 大数据结构操作(大数据处理)
- 远程方法调用(下载文件、网页抓取、Web 服务调用)
2.1 使用 FutureTask 实现 Future
在我们的示例中,将创建一个非常简单的类,用于计算一个整数的平方。这个任务显然不属于“长时间运行”的范畴,但我们会加入 Thread.sleep() 调用,使其在完成前暂停 1 秒钟:
public class SquareCalculator {
private ExecutorService executor
= Executors.newSingleThreadExecutor();
public Future<Integer> calculate(Integer input) {
return executor.submit(() -> {
Thread.sleep(1000);
return input * input;
});
}
}
实际执行计算的代码位于 call() 方法中,并以 Lambda 表达式的形式提供。除了前面提到的 sleep() 调用外,这段代码本身并无特别之处。
真正有趣的部分在于对 Callable 和 ExecutorService 的使用。
Callable 是一个表示有返回值任务的接口,它只有一个 call() 方法。这里我们通过 Lambda 表达式创建了一个 Callable 实例。
仅创建 Callable 实例并不会启动任务;我们仍需将其传递给一个执行器(executor),由执行器负责在新线程中启动任务,并返回宝贵的 Future 对象。这就是 ExecutorService 的作用所在。
获取 ExecutorService 实例有多种方式,其中大多数由工具类 Executors 的静态工厂方法提供。在本例中,我们使用了最基本的 newSingleThreadExecutor(),它返回一个一次只能处理一个线程的 ExecutorService。
一旦有了 ExecutorService 对象,只需调用其 submit() 方法并传入 Callable 实例即可。submit() 会启动任务并返回一个 FutureTask 对象——这是 Future 接口的一个实现类。
3. 使用 Future
到目前为止,我们已经学会了如何创建 Future 实例。
在本节中,我们将学习如何使用该实例,具体通过探索 Future API 中的所有方法。
3.1 使用 isDone() 和 get() 获取结果
现在我们需要调用 calculate() 方法,并使用返回的 Future 来获取计算结果。Future API 中有两个方法可帮助我们完成此任务。
Future.isDone() 用于判断执行器是否已完成任务处理。如果任务已完成,则返回 true;否则返回 false。
用于获取实际计算结果的方法是 Future.get()。需要注意的是,该方法会阻塞当前线程,直到任务完成。不过在我们的示例中这不会成为问题,因为我们会在调用 get() 前先通过 isDone() 检查任务是否已完成。
通过结合使用这两个方法,我们可以在等待主任务完成的同时运行其他代码:
Future<Integer> future = new SquareCalculator().calculate(10);
while(!future.isDone()) {
System.out.println("Calculating...");
Thread.sleep(300);
}
Integer result = future.get();
在此示例中,我们会向控制台输出一条简单消息,告知用户程序正在进行计算。
get() 方法会阻塞执行,直到任务完成。但在本例中,由于我们确保只在任务完成后才调用 get(),因此 future.get() 总是能立即返回。
值得一提的是,get() 还有一个重载版本,接受超时时间和时间单位作为参数:
Integer result = future.get(500, TimeUnit.MILLISECONDS);
get(long, TimeUnit) 与无参 get() 的区别在于:前者若在指定超时时间内任务未完成,将抛出 TimeoutException。
3.2 使用 cancel() 取消 Future
假设我们启动了一个任务,但出于某种原因不再关心其结果。此时可以使用 Future.cancel(boolean) 告诉执行器停止操作并中断其底层线程:
Future<Integer> future = new SquareCalculator().calculate(4);
boolean canceled = future.cancel(true);
上述代码中的 Future 实例将永远不会完成其操作。事实上,如果在调用 cancel() 后再尝试调用 get(),将会抛出 CancellationException。Future.isCancelled() 可用于判断 Future 是否已被取消,这有助于避免 CancellationException。
此外,cancel() 调用也可能失败,此时将返回 false。需要注意的是,cancel() 接受一个布尔参数,用于控制是否中断正在执行任务的线程。
4. 使用线程池实现更多并发
我们当前的 ExecutorService 是单线程的,因为它通过 Executors.newSingleThreadExecutor() 创建。为了凸显这一点,让我们同时触发两个计算任务:
SquareCalculator squareCalculator = new SquareCalculator();
Future<Integer> future1 = squareCalculator.calculate(10);
Future<Integer> future2 = squareCalculator.calculate(100);
while (!(future1.isDone() && future2.isDone())) {
System.out.println(
String.format(
"future1 is %s and future2 is %s",
future1.isDone() ? "done" : "not done",
future2.isDone() ? "done" : "not done"
)
);
Thread.sleep(300);
}
Integer result1 = future1.get();
Integer result2 = future2.get();
System.out.println(result1 + " and " + result2);
squareCalculator.shutdown();
现在分析这段代码的输出:
calculating square for: 10
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
calculating square for: 100
future1 is done and future2 is not done
future1 is done and future2 is not done
future1 is done and future2 is not done
100 and 10000
显然,整个过程并非并行执行。可以看到第二个任务只有在第一个任务完成后才开始,导致整个过程耗时约 2 秒。
要真正实现多线程并发,我们应该使用不同类型的 ExecutorService。让我们看看如果改用 Executors.newFixedThreadPool() 提供的线程池,示例行为会发生什么变化:
public class SquareCalculator {
private ExecutorService executor = Executors.newFixedThreadPool(2);
//...
}
仅需对 SquareCalculator 类做这一处简单修改,我们就获得了一个能同时使用 2 个线程的执行器。
再次运行完全相同的客户端代码,将得到如下输出:
calculating square for: 10
calculating square for: 100
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
100 and 10000
现在效果好多了!可以看到两个任务同时开始并完成,整个过程仅耗时约 1 秒。
还有其他工厂方法可用于创建线程池,例如:
Executors.newCachedThreadPool():在可用时重用先前使用的线程Executors.newScheduledThreadPool():用于在指定延迟后调度命令执行
有关 ExecutorService 的更多信息,请参阅我们专门介绍该主题的文章。
5. ForkJoinTask 概述
ForkJoinTask 是一个抽象类,实现了 Future 接口,能够在少量实际线程(由 ForkJoinPool 管理)上运行大量任务。
在本节中,我们将快速介绍 ForkJoinPool 的主要特性。如需全面了解该主题,请参阅我们的《Java Fork/Join 框架指南》。
ForkJoinTask 的主要特点是:通常会在完成主任务所需的工作过程中生成新的子任务。它通过调用 fork() 生成新任务,并通过 join() 收集所有结果,因此得名。
有两个抽象类实现了 ForkJoinTask:
RecursiveTask:完成时返回值RecursiveAction:不返回任何值
顾名思义,这些类适用于递归任务,例如文件系统遍历或复杂数学计算。
让我们扩展之前的示例,创建一个类:给定一个整数,计算其所有阶乘元素的平方和。例如,若传入数字 4,应得到 4² + 3² + 2² + 1² = 30 的结果。
首先,我们需要创建 RecursiveTask 的具体实现,并实现其 compute() 方法。业务逻辑将写在此方法中:
public class FactorialSquareCalculator extends RecursiveTask<Integer> {
private Integer n;
public FactorialSquareCalculator(Integer n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
FactorialSquareCalculator calculator
= new FactorialSquareCalculator(n - 1);
calculator.fork();
return n * n + calculator.join();
}
}
注意,我们在 compute() 方法内部通过创建新的 FactorialSquareCalculator 实例来实现递归。通过调用非阻塞方法 fork(),我们请求 ForkJoinPool 启动该子任务的执行。
join() 方法将返回该子任务的计算结果,我们将其与当前数字的平方相加。
现在只需创建一个 ForkJoinPool 来处理执行和线程管理:
ForkJoinPool forkJoinPool = new ForkJoinPool();
FactorialSquareCalculator calculator = new FactorialSquareCalculator(10);
forkJoinPool.execute(calculator);
6. 结论
在本文中,我们全面探讨了 Future 接口及其所有方法。我们还学习了如何利用线程池来触发多个并行操作。此外,还简要介绍了 ForkJoinTask 类的主要方法 fork() 和 join()。