Anshul Bansal 2020-01-11
1. 概述
随着对编写非阻塞代码需求的不断增长,我们需要一些方法来实现代码的异步执行。
在本教程中,我们将探讨几种在 Java 中实现异步编程的方式。同时也会介绍一些提供开箱即用解决方案的 Java 库。
2. Java 中的异步编程
2.1. Thread(线程)
我们可以创建一个新的线程来异步执行任意操作。自从 Java 8 引入 Lambda 表达式后,这种方式变得更加简洁、可读性更强。
下面创建一个新线程,用于计算并打印某个数字的阶乘:
int number = 20;
Thread newThread = new Thread(() -> {
System.out.println("Factorial of " + number + " is: " + factorial(number));
});
newThread.start();
2.2. FutureTask
从 Java 5 开始,Future 接口提供了通过 FutureTask 执行异步操作的能力。
我们可以使用 ExecutorService 的 submit 方法异步执行任务,并返回一个 FutureTask 实例。
例如,计算一个数字的阶乘:
ExecutorService threadpool = Executors.newCachedThreadPool();
Future<Long> futureTask = threadpool.submit(() -> factorial(number));
while (!futureTask.isDone()) {
System.out.println("FutureTask is not finished yet...");
}
long result = futureTask.get();
threadpool.shutdown();
这里我们使用了 Future 接口提供的 isDone() 方法来检查任务是否完成。一旦完成,就可以通过 get() 方法获取结果。
2.3. CompletableFuture
Java 8 引入了 CompletableFuture,它结合了 Future 和 CompletionStage 的功能,提供了诸如 supplyAsync、runAsync 和 thenApplyAsync 等方法,用于异步编程。
现在,我们使用 CompletableFuture 替代 FutureTask 来计算阶乘:
CompletableFuture<Long> completableFuture = CompletableFuture.supplyAsync(() -> factorial(number));
while (!completableFuture.isDone()) {
System.out.println("CompletableFuture is not finished yet...");
}
long result = completableFuture.get();
这里我们无需显式使用 ExecutorService。CompletableFuture 内部默认使用 ForkJoinPool 来异步处理任务,因此代码更加简洁。
3. Guava
Guava 提供了 ListenableFuture 类用于执行异步操作。
首先,添加最新版的 Guava Maven 依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
然后使用 ListenableFuture 计算阶乘:
ExecutorService threadpool = Executors.newCachedThreadPool();
ListeningExecutorService service = MoreExecutors.listeningDecorator(threadpool);
ListenableFuture<Long> guavaFuture = (ListenableFuture<Long>) service.submit(() -> factorial(number));
long result = guavaFuture.get();
这里 MoreExecutors 类提供了 ListeningExecutorService 的实例。ListeningExecutorService.submit() 方法会异步执行任务,并返回 ListenableFuture 实例。
Guava 的 Futures 类还提供了诸如 submitAsync、scheduleAsync 和 transformAsync 等方法,用于链式组合 ListenableFuture,类似于 CompletableFuture。
例如,使用 Futures.submitAsync 替代 ListeningExecutorService.submit:
ListeningExecutorService service = MoreExecutors.listeningDecorator(threadpool);
AsyncCallable<Long> asyncCallable = Callables.asAsyncCallable(new Callable<Long>() {
public Long call() {
return factorial(number);
}
}, service);
ListenableFuture<Long> guavaFuture = Futures.submitAsync(asyncCallable, service);
这里 submitAsync 方法需要一个 AsyncCallable 参数,可通过 Callables 类创建。
此外,Futures 类还提供了 addCallback 方法,用于注册成功或失败的回调:
Futures.addCallback(
factorialFuture,
new FutureCallback<Long>() {
public void onSuccess(Long factorial) {
System.out.println(factorial);
}
public void onFailure(Throwable thrown) {
thrown.getCause();
}
},
service);
4. EA Async
Electronic Arts 将 .NET 中的 async-await 特性引入 Java 生态系统,推出了 ea-async 库。
该库允许以顺序方式编写异步(非阻塞)代码,使异步编程更简单且自然可扩展。
首先,在 pom.xml 中添加最新版的 ea-async Maven 依赖:
<dependency>
<groupId>com.ea.async</groupId>
<artifactId>ea-async</artifactId>
<version>1.2.3</version>
</dependency>
然后,使用 EA Async 提供的 await 方法改造前面的 CompletableFuture 示例:
static {
Async.init();
}
public long factorialUsingEAAsync(int number) {
CompletableFuture<Long> completableFuture = CompletableFuture.supplyAsync(() -> factorial(number));
long result = Async.await(completableFuture);
}
我们在静态代码块中调用 Async.init() 方法,以初始化 Async 运行时字节码增强(instrumentation)。
Async 的字节码增强会在运行时重写 await 调用,使其行为类似于 Future.join()。
也可以使用 -javaagent JVM 参数进行编译期增强,作为 Async.init() 的替代方案:
java -javaagent:ea-async-1.2.3.jar -cp <classpath> <MainClass>
再看一个例子:使用 CompletableFuture 的链式方法异步执行多个操作:
CompletableFuture<Void> completableFuture = hello()
.thenComposeAsync(hello -> mergeWorld(hello))
.thenAcceptAsync(helloWorld -> print(helloWorld))
.exceptionally(throwable -> {
System.out.println(throwable.getCause());
return null;
});
completableFuture.get();
使用 EA Async 的 await 可将其改写为顺序风格:
try {
String hello = await(hello());
String helloWorld = await(mergeWorld(hello));
await(CompletableFuture.runAsync(() -> print(helloWorld)));
} catch (Exception e) {
e.printStackTrace();
}
虽然代码看起来像同步阻塞风格,但 await 方法实际上不会阻塞线程。
如前所述,所有对 await 的调用都会被 Async 字节码增强重写为类似 Future.join() 的行为。
因此,当 hello() 异步执行完成后,其 Future 结果会传递给 mergeWorld() 方法,最终结果再传递给 CompletableFuture.runAsync() 执行。
5. Cactoos
Cactoos 是一个基于面向对象原则的 Java 库,是 Google Guava 和 Apache Commons 的替代方案,提供了用于各种操作的通用对象。
首先,添加最新版的 Cactoos Maven 依赖:
<dependency>
<groupId>org.cactoos</groupId>
<artifactId>cactoos</artifactId>
<version>0.55.0</version>
</dependency>
该库提供了 Async 类用于异步操作。
例如,使用 Cactoos 的 Async 类计算阶乘:
Async<Integer, Long> asyncFunction = new Async<Integer, Long>(input -> factorial(input));
Future<Long> asyncFuture = asyncFunction.apply(number);
long result = asyncFuture.get();
这里的 apply 方法内部使用 ExecutorService.submit() 执行操作,并返回 Future 接口的实例。
同样,Async 类还提供了 exec 方法,用于无返回值的异步操作。
注意:Cactoos 库仍处于早期开发阶段,可能尚不适合用于生产环境。
6. Jcabi-Aspects
Jcabi-Aspects 通过 AspectJ AOP 切面提供了 @Async 注解,用于异步编程。
首先,添加最新版的 jcabi-aspects Maven 依赖:
<dependency>
<groupId>com.jcabi</groupId>
<artifactId>jcabi-aspects</artifactId>
<version>0.26.0</version>
</dependency>
该库需要 AspectJ 运行时支持,因此还需添加 aspectjrt 依赖:
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>1.9.20.1</version>
</dependency>
接下来,添加 jcabi-maven-plugin 插件,该插件通过 AspectJ 对字节码进行织入(weaving),其中 ajc 目标负责全部工作:
<plugin>
<groupId>com.jcabi</groupId>
<artifactId>jcabi-maven-plugin</artifactId>
<version>0.14.1</version>
<executions>
<execution>
<goals>
<goal>ajc</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjtools</artifactId>
<version>1.9.20.1</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.20.1</version>
</dependency>
</dependencies>
</plugin>
现在就可以使用 AOP 切面进行异步编程了:
@Async
@Loggable
public Future<Long> factorialUsingJcabiAspect(int number) {
Future<Long> factorialFuture = CompletableFuture.supplyAsync(() -> factorial(number));
return factorialFuture;
}
编译代码时,库会通过 AspectJ 织入,在 @Async 注解处注入 AOP 通知,从而实现 factorialUsingJcabiAspect 方法的异步执行。
使用 Maven 命令编译类:
mvn install
jcabi-maven-plugin 的输出可能如下所示:
--- jcabi-maven-plugin:0.14.1:ajc (default) @ java-async ---
[INFO] jcabi-aspects 0.18/55a5c13 started new daemon thread jcabi-loggable for watching of @Loggable annotated methods
[INFO] Unwoven classes will be copied to /tutorials/java-async/target/unwoven
[INFO] jcabi-aspects 0.18/55a5c13 started new daemon thread jcabi-cacheable for automated cleaning of expired @Cacheable values
[INFO] ajc result: 10 file(s) processed, 0 pointcut(s) woven, 0 error(s), 0 warning(s)
我们可以通过插件生成的 jcabi-ajc.log 日志文件验证类是否正确织入:
Join point 'method-execution(java.util.concurrent.Future
com.baeldung.async.JavaAsync.factorialUsingJcabiAspect(int))'
in Type 'com.baeldung.async.JavaAsync' (JavaAsync.java:158)
advised by around advice from 'com.jcabi.aspects.aj.MethodAsyncRunner'
(jcabi-aspects-0.22.6.jar!MethodAsyncRunner.class(from MethodAsyncRunner.java))
运行该类作为普通 Java 应用程序,输出可能如下:
17:46:58.245 [main] INFO com.jcabi.aspects.aj.NamedThreads -
jcabi-aspects 0.22.6/3f0a1f7 started new daemon thread jcabi-loggable for watching of @Loggable annotated methods
17:46:58.355 [main] INFO com.jcabi.aspects.aj.NamedThreads -
jcabi-aspects 0.22.6/3f0a1f7 started new daemon thread jcabi-async for Asynchronous method execution
17:46:58.358 [jcabi-async] INFO com.baeldung.async.JavaAsync -
#factorialUsingJcabiAspect(20): 'java.util.concurrent.CompletableFuture@14e2d7c1[Completed normally]' in 44.64µs
可以看到,库创建了一个名为 jcabi-async 的守护线程来异步执行任务。
同时,@Loggable 注解也启用了日志记录功能。
7. 结论
在本文中,我们学习了多种 Java 异步编程的方法。
首先,我们探索了 Java 内置的 FutureTask 和 CompletableFuture;接着介绍了 EA Async 和 Cactoos 等提供开箱即用解决方案的第三方库。
我们还讨论了 Guava 的 ListenableFuture 和 Futures 类在异步任务处理中的应用;最后,介绍了 jcabi-Aspects 库如何通过其 @Async 注解结合 AOP 切面实现异步方法调用。