baeldung 2025-03-26
1. 概述
java.util.concurrent 包提供了用于创建并发应用程序的工具。
在本文中,我们将对该包进行整体概述。
2. 主要组件
java.util.concurrent 包含的功能太多,无法在单篇文章中全部讨论。本文将主要聚焦于该包中一些最实用的工具类,例如:
- Executor
- ExecutorService
- ScheduledExecutorService
- Future
- CountDownLatch
- CyclicBarrier
- Semaphore
- ThreadFactory
- BlockingQueue
- DelayQueue
- Locks(锁)
- Phaser
你也可以在这里找到针对各个类的专门文章。
2.1 Executor
Executor 是一个接口,表示一个执行所提供任务的对象。
具体实现决定了任务是在新线程还是当前线程中运行。因此,使用此接口可以将任务执行流程与实际的任务执行机制解耦。
需要注意的是,Executor 并不要求任务必须异步执行。在最简单的情况下,执行器可以在调用线程中立即执行提交的任务。
我们需要创建一个调用器(invoker)来实例化 Executor:
public class Invoker implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}
现在,我们可以使用这个调用器来执行任务:
public void execute() {
Executor executor = new Invoker();
executor.execute(() -> {
// 要执行的任务
});
}
注意:如果执行器无法接受任务执行,它将抛出 RejectedExecutionException 异常。
2.2 ExecutorService
ExecutorService 是一个完整的异步处理解决方案。它管理一个内存队列,并根据线程可用性调度提交的任务。
要使用 ExecutorService,我们需要先创建一个 Runnable 类:
public class Task implements Runnable {
@Override
public void run() {
// 任务细节
}
}
然后我们可以创建 ExecutorService 实例并分配任务。创建时需要指定线程池大小:
ExecutorService executor = Executors.newFixedThreadPool(10);
如果我们想创建单线程的 ExecutorService 实例,可以使用 newSingleThreadExecutor(ThreadFactory threadFactory) 方法。
一旦执行器创建完成,就可以用来提交任务:
public void execute() {
executor.submit(new Task());
}
我们也可以在提交任务时直接创建 Runnable 实例:
executor.submit(() -> {
new Task();
});
ExecutorService 还提供了两种现成的执行终止方法:
shutdown():等待所有已提交的任务执行完毕后再关闭。shutdownNow():尝试终止所有正在执行的任务,并停止处理等待中的任务。
此外还有一个 awaitTermination(long timeout, TimeUnit unit) 方法,它会强制阻塞,直到以下任一情况发生:
- 所有任务在触发关闭后完成执行;
- 超时;
- 当前线程被中断。
try {
executor.awaitTermination(20L, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
2.3 ScheduledExecutorService
ScheduledExecutorService 与 ExecutorService 接口类似,但支持周期性地执行任务。
Executor 和 ExecutorService 的方法是立即调度执行的,不引入任何人为延迟。零或负值表示请求需立即执行。
我们可以使用 Runnable 或 Callable 接口定义任务:
public void execute() {
ScheduledExecutorService executorService
= Executors.newSingleThreadScheduledExecutor();
Future<String> future = executorService.schedule(() -> {
// ...
return "Hello world";
}, 1, TimeUnit.SECONDS);
ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
// ...
}, 1, TimeUnit.SECONDS);
executorService.shutdown();
}
ScheduledExecutorService 还可以安排任务在固定延迟后重复执行:
executorService.scheduleAtFixedRate(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
executorService.scheduleWithFixedDelay(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
其中:
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)方法首先在初始延迟后执行任务,之后每隔指定周期重复执行,直到服务关闭。scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)方法首先在初始延迟后执行任务,之后每次在上一次任务完成后,再等待指定延迟时间才开始下一次执行。
2.4 Future
Future 用于表示异步操作的结果。它提供了一些方法用于检查异步操作是否完成、获取计算结果等。
此外,cancel(boolean mayInterruptIfRunning) 方法可用于取消操作并释放执行线程。若 mayInterruptIfRunning 为 true,则正在执行任务的线程将被立即终止;否则,允许正在进行的任务完成。
我们可以使用以下代码片段创建 Future 实例:
public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> {
// ...
Thread.sleep(10000L);
return "Hello world";
});
}
我们可以使用以下代码检查 Future 结果是否就绪,并在计算完成后获取数据:
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
我们还可以为操作指定超时时间。如果任务耗时超过该时间,将抛出 TimeoutException:
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
2.5 CountDownLatch
CountDownLatch(JDK 5 引入)是一个工具类,用于阻塞一组线程,直到某个操作完成。
CountDownLatch 初始化时带有一个计数器(整型);每当依赖线程完成执行,计数器递减。一旦计数器归零,其他线程就会被释放。
2.6 CyclicBarrier
CyclicBarrier 的工作方式与 CountDownLatch 几乎相同,不同之处在于它可以被重用。与 CountDownLatch 不同,它允许多个线程通过 await() 方法(称为屏障条件)相互等待,然后再执行最终任务。
我们需要创建一个 Runnable 任务实例来初始化屏障条件:
public class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() +
" is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() +
" is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
现在我们可以启动多个线程竞争屏障条件:
public void start() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});
Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");
if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}
这里,isBroken() 方法用于检查是否有线程在执行期间被中断。在执行实际处理前,应始终进行此检查。
2.7 Semaphore
Semaphore(信号量)用于限制线程对某部分物理或逻辑资源的访问。信号量包含一组许可(permits);每当线程试图进入临界区时,都需要检查信号量是否有可用许可。
如果没有可用许可(通过 tryAcquire()),线程将无法进入临界区;如果有,则授予访问权限,并减少许可计数。
当执行线程退出临界区时,通过 release() 方法增加许可计数。
我们可以通过 tryAcquire(long timeout, TimeUnit unit) 方法指定获取许可的超时时间。
还可以检查当前可用的许可数量或等待获取信号量的线程数量。
以下代码片段展示了如何实现一个信号量:
static Semaphore semaphore = new Semaphore(10);
public void execute() throws InterruptedException {
LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " +
semaphore.getQueueLength());
if (semaphore.tryAcquire()) {
try {
// ...
} finally {
semaphore.release();
}
}
}
我们可以使用 Semaphore 实现类似互斥锁(Mutex)的数据结构。
2.8 ThreadFactory
顾名思义,ThreadFactory 充当一个(虚拟的)线程池,在需要时创建新线程。它消除了大量样板代码,简化了高效线程创建机制的实现。
我们可以定义一个 ThreadFactory:
public class BaeldungThreadFactory implements ThreadFactory {
private int threadId;
private String name;
public BaeldungThreadFactory(String name) {
threadId = 1;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
LOG.info("created new thread with id : " + threadId +
" and name : " + t.getName());
threadId++;
return t;
}
}
我们可以使用 newThread(Runnable r) 方法在运行时创建新线程:
BaeldungThreadFactory factory = new BaeldungThreadFactory("BaeldungThreadFactory");
for (int i = 0; i < 10; i++) {
Thread t = factory.newThread(new Task());
t.start();
}
2.9 BlockingQueue
在异步编程中,最常见的集成模式之一是生产者-消费者模式。java.util.concurrent 包提供了一种名为 BlockingQueue 的数据结构,在这些异步场景中非常有用。
2.10 DelayQueue
DelayQueue 是一个无界阻塞队列,其中的元素只有在其过期时间(即用户定义的延迟)完成后才能被取出。因此,队列头部(topmost element)具有最长的延迟,将最后被取出。
2.11 Locks(锁)
不出所料,Lock 是一种工具,用于阻止其他线程访问某段代码,只允许当前正在执行的线程访问。
与 synchronized 块的主要区别在于:synchronized 块必须完全包含在一个方法内,而 Lock API 的 lock() 和 unlock() 操作可以分别位于不同的方法中。
2.12 Phaser
Phaser 是比 CyclicBarrier 和 CountDownLatch 更灵活的解决方案,用作可重用的屏障,动态数量的线程可在继续执行前在此等待。我们可以协调多个执行阶段,并在每个程序阶段重用同一个 Phaser 实例。
3. 结论
在这篇高层次的概述文章中,我们重点介绍了 java.util.concurrent 包中提供的各种实用工具。