baeldung 2024-01-08
1. 简介
CyclicBarrier 是一种同步工具,自 Java 5 起作为 java.util.concurrent 包的一部分被引入。
在本文中,我们将在一个并发场景中探索该实现。
2. Java 并发 —— 同步器(Synchronizers)
java.util.concurrent 包包含多个类,用于管理一组相互协作的线程。其中一些包括:
CyclicBarrierPhaserCountDownLatchExchangerSemaphoreSynchronousQueue
这些类为线程之间常见的交互模式提供了开箱即用的功能。
如果我们有一组线程彼此通信,并且符合某种常见模式,那么我们可以直接复用相应的库类(也称为“同步器”),而无需自己使用锁、条件对象和 synchronized 关键字来设计复杂的同步方案。
接下来,我们将重点介绍 CyclicBarrier。
3. CyclicBarrier
CyclicBarrier 是一种同步器,它允许一组线程彼此等待,直到所有线程都到达某个公共执行点(也称为“屏障”)。
CyclicBarrier 常用于这样的程序:固定数量的线程必须彼此等待,直到全部到达某个公共点后才能继续执行。
之所以称为“循环”(cyclic),是因为一旦等待的线程被释放,该屏障可以被重复使用。
4. 使用方式
CyclicBarrier 的构造函数非常简单。它接受一个整数参数,表示需要调用 await() 方法的线程数量,以表明它们已到达公共执行点:
public CyclicBarrier(int parties)
这些需要同步执行的线程也被称为“参与方”(parties),而调用 await() 方法就是注册某个线程已到达屏障点的方式。
该调用是同步的,调用此方法的线程会挂起,直到指定数量的线程都调用了该方法。当所需数量的线程都调用了 await(),就称为“触发屏障”(tripping the barrier)。
此外,我们还可以向构造函数传递第二个参数,即一个 Runnable 实例。该逻辑将由最后一个触发屏障的线程执行:
public CyclicBarrier(int parties, Runnable barrierAction)
5. 实现示例
为了演示 CyclicBarrier 的实际应用,考虑以下场景:
有固定数量的线程执行某项操作,并将各自的结果存储在一个列表中。当所有线程完成操作后,其中一个线程(通常是最后一个触发屏障的线程)开始处理所有线程收集到的数据。
首先,我们实现主类,其中包含所有核心逻辑:
public class CyclicBarrierDemo {
private CyclicBarrier cyclicBarrier;
private List<List<Integer>> partialResults
= Collections.synchronizedList(new ArrayList<>());
private Random random = new Random();
private int NUM_PARTIAL_RESULTS;
private int NUM_WORKERS;
// ...
}
这个类非常直观:
NUM_WORKERS表示将要执行的线程数量;NUM_PARTIAL_RESULTS表示每个工作线程将生成的结果数量;partialResults是一个列表,用于存储每个工作线程的结果。
注意:该列表是一个 SynchronizedList,因为多个线程会同时写入它,而普通 ArrayList 的 add() 方法不是线程安全的。
接下来,我们实现每个工作线程的逻辑:
public class CyclicBarrierDemo {
// ...
class NumberCruncherThread implements Runnable {
@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
List<Integer> partialResult = new ArrayList<>();
// 执行计算并存储部分结果
for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {
Integer num = random.nextInt(10);
System.out.println(thisThreadName
+ ": Crunching some numbers! Final result - " + num);
partialResult.add(num);
}
partialResults.add(partialResult);
try {
System.out.println(thisThreadName
+ " waiting for others to reach barrier.");
cyclicBarrier.await();
} catch (InterruptedException e) {
// ...
} catch (BrokenBarrierException e) {
// ...
}
}
}
}
现在,我们实现屏障被触发后执行的逻辑。
为简化起见,我们只需将所有部分结果中的数字相加:
public class CyclicBarrierDemo {
// ...
class AggregatorThread implements Runnable {
@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
System.out.println(
thisThreadName + ": Computing sum of " + NUM_WORKERS
+ " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
int sum = 0;
for (List<Integer> threadResult : partialResults) {
System.out.print("Adding ");
for (Integer partialResult : threadResult) {
System.out.print(partialResult+" ");
sum += partialResult;
}
System.out.println();
}
System.out.println(thisThreadName + ": Final result = " + sum);
}
}
}
最后一步是构造 CyclicBarrier 并通过 main() 方法启动整个流程:
public class CyclicBarrierDemo {
// 之前的代码
public void runSimulation(int numWorkers, int numberOfPartialResults) {
NUM_PARTIAL_RESULTS = numberOfPartialResults;
NUM_WORKERS = numWorkers;
cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());
System.out.println("Spawning " + NUM_WORKERS
+ " worker threads to compute "
+ NUM_PARTIAL_RESULTS + " partial results each");
for (int i = 0; i < NUM_WORKERS; i++) {
Thread worker = new Thread(new NumberCruncherThread());
worker.setName("Thread " + i);
worker.start();
}
}
public static void main(String[] args) {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.runSimulation(5, 3);
}
}
在上述代码中,我们初始化了一个 CyclicBarrier,要求 5 个线程参与,每个线程生成 3 个整数作为其计算结果,并将结果存入总列表中。
一旦屏障被触发,最后一个触发屏障的线程将执行 AggregatorThread 中定义的逻辑——即将所有线程生成的数字求和。
6. 运行结果
以下是上述程序一次执行的输出(每次运行结果可能不同,因为线程调度顺序不确定):
Spawning 5 worker threads to compute 3 partial results each
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 2
Thread 0: Crunching some numbers! Final result - 2
Thread 0 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 2
Thread 1: Crunching some numbers! Final result - 0
Thread 1: Crunching some numbers! Final result - 5
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 6
Thread 3: Crunching some numbers! Final result - 4
Thread 3: Crunching some numbers! Final result - 0
Thread 3 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 9
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 5
Thread 4 waiting for others to reach barrier.
Thread 4: Computing final sum of 5 workers, having 3 results each.
Adding 6 2 2
Adding 2 0 5
Adding 6 4 0
Adding 1 1 0
Adding 9 3 5
Thread 4: Final result = 46
如上所示,Thread 4 是触发屏障的线程,同时也执行了最终的聚合逻辑。此外,线程的实际执行顺序不一定与其启动顺序一致,如本例所示。
7. 结论
在本文中,我们了解了 CyclicBarrier 是什么,以及它适用于哪些场景。
我们还实现了一个具体案例:需要固定数量的线程到达某个固定的执行点后,再继续执行后续的程序逻辑。