Java 中的 CyclicBarrier

更新于 2025-12-29

baeldung 2024-01-08

1. 简介

CyclicBarrier 是一种同步工具,自 Java 5 起作为 java.util.concurrent 包的一部分被引入。

在本文中,我们将在一个并发场景中探索该实现。

2. Java 并发 —— 同步器(Synchronizers)

java.util.concurrent 包包含多个类,用于管理一组相互协作的线程。其中一些包括:

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Exchanger
  • Semaphore
  • SynchronousQueue

这些类为线程之间常见的交互模式提供了开箱即用的功能。

如果我们有一组线程彼此通信,并且符合某种常见模式,那么我们可以直接复用相应的库类(也称为“同步器”),而无需自己使用锁、条件对象和 synchronized 关键字来设计复杂的同步方案。

接下来,我们将重点介绍 CyclicBarrier

3. CyclicBarrier

CyclicBarrier 是一种同步器,它允许一组线程彼此等待,直到所有线程都到达某个公共执行点(也称为“屏障”)。

CyclicBarrier 常用于这样的程序:固定数量的线程必须彼此等待,直到全部到达某个公共点后才能继续执行。

之所以称为“循环”(cyclic),是因为一旦等待的线程被释放,该屏障可以被重复使用。

4. 使用方式

CyclicBarrier 的构造函数非常简单。它接受一个整数参数,表示需要调用 await() 方法的线程数量,以表明它们已到达公共执行点:

public CyclicBarrier(int parties)

这些需要同步执行的线程也被称为“参与方”(parties),而调用 await() 方法就是注册某个线程已到达屏障点的方式。

该调用是同步的,调用此方法的线程会挂起,直到指定数量的线程都调用了该方法。当所需数量的线程都调用了 await(),就称为“触发屏障”(tripping the barrier)。

此外,我们还可以向构造函数传递第二个参数,即一个 Runnable 实例。该逻辑将由最后一个触发屏障的线程执行:

public CyclicBarrier(int parties, Runnable barrierAction)

5. 实现示例

为了演示 CyclicBarrier 的实际应用,考虑以下场景:

有固定数量的线程执行某项操作,并将各自的结果存储在一个列表中。当所有线程完成操作后,其中一个线程(通常是最后一个触发屏障的线程)开始处理所有线程收集到的数据。

首先,我们实现主类,其中包含所有核心逻辑:

public class CyclicBarrierDemo {

    private CyclicBarrier cyclicBarrier;
    private List<List<Integer>> partialResults
     = Collections.synchronizedList(new ArrayList<>());
    private Random random = new Random();
    private int NUM_PARTIAL_RESULTS;
    private int NUM_WORKERS;

    // ...
}

这个类非常直观:

  • NUM_WORKERS 表示将要执行的线程数量;
  • NUM_PARTIAL_RESULTS 表示每个工作线程将生成的结果数量;
  • partialResults 是一个列表,用于存储每个工作线程的结果。

注意:该列表是一个 SynchronizedList,因为多个线程会同时写入它,而普通 ArrayListadd() 方法不是线程安全的。

接下来,我们实现每个工作线程的逻辑:

public class CyclicBarrierDemo {

    // ...

    class NumberCruncherThread implements Runnable {

        @Override
        public void run() {
            String thisThreadName = Thread.currentThread().getName();
            List<Integer> partialResult = new ArrayList<>();

            // 执行计算并存储部分结果
            for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {    
                Integer num = random.nextInt(10);
                System.out.println(thisThreadName
                  + ": Crunching some numbers! Final result - " + num);
                partialResult.add(num);
            }

            partialResults.add(partialResult);
            try {
                System.out.println(thisThreadName 
                  + " waiting for others to reach barrier.");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                // ...
            } catch (BrokenBarrierException e) {
                // ...
            }
        }
    }

}

现在,我们实现屏障被触发后执行的逻辑。

为简化起见,我们只需将所有部分结果中的数字相加:

public class CyclicBarrierDemo {

    // ...
    
    class AggregatorThread implements Runnable {

        @Override
        public void run() {

            String thisThreadName = Thread.currentThread().getName();

            System.out.println(
              thisThreadName + ": Computing sum of " + NUM_WORKERS 
              + " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
            int sum = 0;

            for (List<Integer> threadResult : partialResults) {
                System.out.print("Adding ");
                for (Integer partialResult : threadResult) {
                    System.out.print(partialResult+" ");
                    sum += partialResult;
                }
                System.out.println();
            }
            System.out.println(thisThreadName + ": Final result = " + sum);
        }
    }
}

最后一步是构造 CyclicBarrier 并通过 main() 方法启动整个流程:

public class CyclicBarrierDemo {

    // 之前的代码
 
    public void runSimulation(int numWorkers, int numberOfPartialResults) {
        NUM_PARTIAL_RESULTS = numberOfPartialResults;
        NUM_WORKERS = numWorkers;

        cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());

        System.out.println("Spawning " + NUM_WORKERS
          + " worker threads to compute "
          + NUM_PARTIAL_RESULTS + " partial results each");
 
        for (int i = 0; i < NUM_WORKERS; i++) {
            Thread worker = new Thread(new NumberCruncherThread());
            worker.setName("Thread " + i);
            worker.start();
        }
    }

    public static void main(String[] args) {
        CyclicBarrierDemo demo = new CyclicBarrierDemo();
        demo.runSimulation(5, 3);
    }
}

在上述代码中,我们初始化了一个 CyclicBarrier,要求 5 个线程参与,每个线程生成 3 个整数作为其计算结果,并将结果存入总列表中。

一旦屏障被触发,最后一个触发屏障的线程将执行 AggregatorThread 中定义的逻辑——即将所有线程生成的数字求和。

6. 运行结果

以下是上述程序一次执行的输出(每次运行结果可能不同,因为线程调度顺序不确定):

Spawning 5 worker threads to compute 3 partial results each
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 2
Thread 0: Crunching some numbers! Final result - 2
Thread 0 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 2
Thread 1: Crunching some numbers! Final result - 0
Thread 1: Crunching some numbers! Final result - 5
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 6
Thread 3: Crunching some numbers! Final result - 4
Thread 3: Crunching some numbers! Final result - 0
Thread 3 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 9
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 5
Thread 4 waiting for others to reach barrier.
Thread 4: Computing final sum of 5 workers, having 3 results each.
Adding 6 2 2 
Adding 2 0 5 
Adding 6 4 0 
Adding 1 1 0 
Adding 9 3 5 
Thread 4: Final result = 46

如上所示,Thread 4 是触发屏障的线程,同时也执行了最终的聚合逻辑。此外,线程的实际执行顺序不一定与其启动顺序一致,如本例所示。

7. 结论

在本文中,我们了解了 CyclicBarrier 是什么,以及它适用于哪些场景。

我们还实现了一个具体案例:需要固定数量的线程到达某个固定的执行点后,再继续执行后续的程序逻辑。