java.util.concurrent 概述

更新于 2025-12-29

baeldung 2025-03-26

1. 概述

java.util.concurrent 包提供了用于创建并发应用程序的工具。

在本文中,我们将对该包进行整体概述。

2. 主要组件

java.util.concurrent 包含的功能太多,无法在单篇文章中全部讨论。本文将主要聚焦于该包中一些最实用的工具类,例如:

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • Future
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Locks(锁)
  • Phaser

你也可以在这里找到针对各个类的专门文章。

2.1 Executor

Executor 是一个接口,表示一个执行所提供任务的对象。

具体实现决定了任务是在新线程还是当前线程中运行。因此,使用此接口可以将任务执行流程与实际的任务执行机制解耦。

需要注意的是,Executor 并不要求任务必须异步执行。在最简单的情况下,执行器可以在调用线程中立即执行提交的任务。

我们需要创建一个调用器(invoker)来实例化 Executor

public class Invoker implements Executor {
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}

现在,我们可以使用这个调用器来执行任务:

public void execute() {
    Executor executor = new Invoker();
    executor.execute(() -> {
        // 要执行的任务
    });
}

注意:如果执行器无法接受任务执行,它将抛出 RejectedExecutionException 异常。

2.2 ExecutorService

ExecutorService 是一个完整的异步处理解决方案。它管理一个内存队列,并根据线程可用性调度提交的任务。

要使用 ExecutorService,我们需要先创建一个 Runnable 类:

public class Task implements Runnable {
    @Override
    public void run() {
        // 任务细节
    }
}

然后我们可以创建 ExecutorService 实例并分配任务。创建时需要指定线程池大小:

ExecutorService executor = Executors.newFixedThreadPool(10);

如果我们想创建单线程的 ExecutorService 实例,可以使用 newSingleThreadExecutor(ThreadFactory threadFactory) 方法。

一旦执行器创建完成,就可以用来提交任务:

public void execute() { 
    executor.submit(new Task()); 
}

我们也可以在提交任务时直接创建 Runnable 实例:

executor.submit(() -> {
    new Task();
});

ExecutorService 还提供了两种现成的执行终止方法:

  • shutdown():等待所有已提交的任务执行完毕后再关闭。
  • shutdownNow():尝试终止所有正在执行的任务,并停止处理等待中的任务。

此外还有一个 awaitTermination(long timeout, TimeUnit unit) 方法,它会强制阻塞,直到以下任一情况发生:

  • 所有任务在触发关闭后完成执行;
  • 超时;
  • 当前线程被中断。
try {
    executor.awaitTermination(20L, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

2.3 ScheduledExecutorService

ScheduledExecutorServiceExecutorService 接口类似,但支持周期性地执行任务。

ExecutorExecutorService 的方法是立即调度执行的,不引入任何人为延迟。零或负值表示请求需立即执行。

我们可以使用 RunnableCallable 接口定义任务:

public void execute() {
    ScheduledExecutorService executorService
      = Executors.newSingleThreadScheduledExecutor();

    Future<String> future = executorService.schedule(() -> {
        // ...
        return "Hello world";
    }, 1, TimeUnit.SECONDS);

    ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
        // ...
    }, 1, TimeUnit.SECONDS);

    executorService.shutdown();
}

ScheduledExecutorService 还可以安排任务在固定延迟后重复执行:

executorService.scheduleAtFixedRate(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

其中:

  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 方法首先在初始延迟后执行任务,之后每隔指定周期重复执行,直到服务关闭。
  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 方法首先在初始延迟后执行任务,之后每次在上一次任务完成后,再等待指定延迟时间才开始下一次执行。

2.4 Future

Future 用于表示异步操作的结果。它提供了一些方法用于检查异步操作是否完成、获取计算结果等。

此外,cancel(boolean mayInterruptIfRunning) 方法可用于取消操作并释放执行线程。若 mayInterruptIfRunningtrue,则正在执行任务的线程将被立即终止;否则,允许正在进行的任务完成。

我们可以使用以下代码片段创建 Future 实例:

public void invoke() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);

    Future<String> future = executorService.submit(() -> {
        // ...
        Thread.sleep(10000L);
        return "Hello world";
    });
}

我们可以使用以下代码检查 Future 结果是否就绪,并在计算完成后获取数据:

if (future.isDone() && !future.isCancelled()) {
    try {
        str = future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

我们还可以为操作指定超时时间。如果任务耗时超过该时间,将抛出 TimeoutException

try {
    future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

2.5 CountDownLatch

CountDownLatch(JDK 5 引入)是一个工具类,用于阻塞一组线程,直到某个操作完成。

CountDownLatch 初始化时带有一个计数器(整型);每当依赖线程完成执行,计数器递减。一旦计数器归零,其他线程就会被释放。

2.6 CyclicBarrier

CyclicBarrier 的工作方式与 CountDownLatch 几乎相同,不同之处在于它可以被重用。与 CountDownLatch 不同,它允许多个线程通过 await() 方法(称为屏障条件)相互等待,然后再执行最终任务。

我们需要创建一个 Runnable 任务实例来初始化屏障条件:

public class Task implements Runnable {

    private CyclicBarrier barrier;

    public Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            LOG.info(Thread.currentThread().getName() + 
              " is waiting");
            barrier.await();
            LOG.info(Thread.currentThread().getName() + 
              " is released");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

现在我们可以启动多个线程竞争屏障条件:

public void start() {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
        // ...
        LOG.info("All previous tasks are completed");
    });

    Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); 
    Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); 
    Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); 

    if (!cyclicBarrier.isBroken()) { 
        t1.start(); 
        t2.start(); 
        t3.start(); 
    }
}

这里,isBroken() 方法用于检查是否有线程在执行期间被中断。在执行实际处理前,应始终进行此检查。

2.7 Semaphore

Semaphore(信号量)用于限制线程对某部分物理或逻辑资源的访问。信号量包含一组许可(permits);每当线程试图进入临界区时,都需要检查信号量是否有可用许可。

如果没有可用许可(通过 tryAcquire()),线程将无法进入临界区;如果有,则授予访问权限,并减少许可计数。

当执行线程退出临界区时,通过 release() 方法增加许可计数。

我们可以通过 tryAcquire(long timeout, TimeUnit unit) 方法指定获取许可的超时时间。

还可以检查当前可用的许可数量或等待获取信号量的线程数量。

以下代码片段展示了如何实现一个信号量:

static Semaphore semaphore = new Semaphore(10);

public void execute() throws InterruptedException {
    LOG.info("Available permit : " + semaphore.availablePermits());
    LOG.info("Number of threads waiting to acquire: " + 
      semaphore.getQueueLength());

    if (semaphore.tryAcquire()) {
        try {
            // ...
        } finally {
            semaphore.release();
        }
    }
}

我们可以使用 Semaphore 实现类似互斥锁(Mutex)的数据结构。

2.8 ThreadFactory

顾名思义,ThreadFactory 充当一个(虚拟的)线程池,在需要时创建新线程。它消除了大量样板代码,简化了高效线程创建机制的实现。

我们可以定义一个 ThreadFactory

public class BaeldungThreadFactory implements ThreadFactory {
    private int threadId;
    private String name;

    public BaeldungThreadFactory(String name) {
        threadId = 1;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, name + "-Thread_" + threadId);
        LOG.info("created new thread with id : " + threadId +
            " and name : " + t.getName());
        threadId++;
        return t;
    }
}

我们可以使用 newThread(Runnable r) 方法在运行时创建新线程:

BaeldungThreadFactory factory = new BaeldungThreadFactory("BaeldungThreadFactory");
for (int i = 0; i < 10; i++) { 
    Thread t = factory.newThread(new Task());
    t.start(); 
}

2.9 BlockingQueue

在异步编程中,最常见的集成模式之一是生产者-消费者模式。java.util.concurrent 包提供了一种名为 BlockingQueue 的数据结构,在这些异步场景中非常有用。

2.10 DelayQueue

DelayQueue 是一个无界阻塞队列,其中的元素只有在其过期时间(即用户定义的延迟)完成后才能被取出。因此,队列头部(topmost element)具有最长的延迟,将最后被取出。

2.11 Locks(锁)

不出所料,Lock 是一种工具,用于阻止其他线程访问某段代码,只允许当前正在执行的线程访问。

synchronized 块的主要区别在于:synchronized 块必须完全包含在一个方法内,而 Lock API 的 lock()unlock() 操作可以分别位于不同的方法中。

2.12 Phaser

Phaser 是比 CyclicBarrierCountDownLatch 更灵活的解决方案,用作可重用的屏障,动态数量的线程可在继续执行前在此等待。我们可以协调多个执行阶段,并在每个程序阶段重用同一个 Phaser 实例。

3. 结论

在这篇高层次的概述文章中,我们重点介绍了 java.util.concurrent 包中提供的各种实用工具。