baeldung 2024-01-08
1. 概述
在本教程中,我们将探讨 Java 中最基础的机制之一 —— 线程同步。
首先,我们会讨论一些与并发相关的重要术语和方法。
随后,我们将开发一个简单的应用程序,在其中处理并发问题,以更好地理解 wait() 和 notify() 的工作原理。
2. Java 中的线程同步
在多线程环境中,多个线程可能会尝试修改同一资源。如果线程管理不当,就会导致数据一致性问题。
2.1 Java 中的守卫块(Guarded Blocks)
我们可以使用 守卫块(guarded blocks) 来协调多个线程的操作。这类代码块会在继续执行前检查某个特定条件是否满足。
为此,我们将使用以下方法:
Object.wait():挂起当前线程Object.notify():唤醒某个等待中的线程
我们可以通过下图更直观地理解线程的生命周期:
Java - Wait and Notify
注意:控制线程生命周期的方式有很多,但在本文中,我们仅聚焦于wait()和notify()。
3. wait() 方法
简而言之,调用 wait() 会强制当前线程等待,直到其他线程在同一个对象上调用 notify() 或 notifyAll()。
为此,当前线程必须拥有该对象的 监视器锁(monitor)。根据 Java 官方文档,这可以通过以下方式实现:
- 执行了该对象的 synchronized 实例方法
- 在该对象上执行了 synchronized 代码块
- 执行了该对象所属类的 synchronized 静态方法
注意:同一时间只有一个活跃线程可以拥有某个对象的监视器锁。
wait() 方法有三种重载形式,下面我们逐一介绍。
3.1 wait()
该方法使当前线程无限期等待,直到另一个线程调用此对象的 notify() 或 notifyAll()。
3.2 wait(long timeout)
通过此方法,我们可以指定一个超时时间(毫秒),超时后线程将自动被唤醒。当然,也可以在超时前通过 notify() 或 notifyAll() 提前唤醒线程。
注意:调用
wait(0)等价于调用wait()。
3.3 wait(long timeout, int nanos)
这是另一种提供相同功能的重载形式,不同之处在于它支持更高的时间精度。
总超时时间(纳秒)计算公式为:1_000_000 * timeout + nanos
4. notify() 与 notifyAll()
我们使用 notify() 方法来唤醒正在等待该对象监视器的线程。
有两种通知等待线程的方式:
4.1 notify()
对于所有正在等待该对象监视器的线程(通过任意一种 wait() 方法进入等待状态),notify() 会 随机唤醒其中一个线程。具体唤醒哪个线程是不确定的,取决于 JVM 的具体实现。
由于 notify() 只唤醒一个随机线程,因此适用于实现互斥锁(mutual exclusion lock)场景,即多个线程执行相似任务。但在大多数情况下,使用 notifyAll() 更为稳妥。
4.2 notifyAll()
该方法会唤醒 所有 正在等待该对象监视器的线程。
被唤醒的线程将像其他试图同步该对象的线程一样,正常参与锁的竞争。
不过,在允许线程继续执行之前,始终应快速检查继续执行所需的条件是否满足。这是因为某些情况下,线程可能在未收到通知的情况下被唤醒(这种情形将在后面的示例中讨论)。
5. 发送者-接收者同步问题
现在我们已经掌握了基础知识,接下来通过一个简单的 发送者–接收者(Sender–Receiver) 应用程序,使用 wait() 和 notifyAll() 实现两者之间的同步:
- 发送者(Sender) 需要向 接收者(Receiver) 发送一个数据包。
- 接收者 在发送者完成发送前不能处理该数据包。
- 同样,发送者 在接收者处理完上一个数据包前,不应尝试发送下一个数据包。
首先,我们创建一个 Data 类,用于封装要在发送者和接收者之间传输的数据包,并使用 wait() 和 notifyAll() 实现同步:
public class Data {
private String packet;
// true 表示接收者应等待
// false 表示发送者应等待
private boolean transfer = true;
public synchronized String receive() {
while (transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("线程被中断");
}
}
transfer = true;
String returnPacket = packet;
notifyAll();
return returnPacket;
}
public synchronized void send(String packet) {
while (!transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("线程被中断");
}
}
transfer = false;
this.packet = packet;
notifyAll();
}
}
代码解析:
packet变量表示通过网络传输的数据。- 布尔变量
transfer用于发送者和接收者之间的同步:- 若为
true,接收者应等待发送者发送消息。 - 若为
false,发送者应等待接收者接收消息。
- 若为
- 发送者 调用
send()方法:- 如果
transfer为false,则调用wait()等待。 - 当条件满足(
transfer == true)时,切换状态、设置消息,并调用notifyAll()唤醒其他线程,表明重要事件已发生,其他线程可检查是否能继续执行。
- 如果
- 接收者 调用
receive()方法:- 只有当
transfer被发送者设为false时才继续执行,否则调用wait()。 - 满足条件后,切换状态、唤醒所有等待线程,并返回接收到的数据包。
- 只有当
5.1 为什么要把 wait() 放在 while 循环中?
因为 notify() 和 notifyAll() 是随机唤醒等待线程的,被唤醒的线程不一定满足继续执行的条件。此外,还可能存在 “虚假唤醒”(spurious wakeup) —— 即线程在未收到任何通知的情况下自行醒来。
使用 while 循环可以确保:只有在条件真正满足时,线程才会继续执行。
5.2 为什么 send() 和 receive() 方法需要同步?
我们将这两个方法声明为 synchronized,是为了获取对象的内置锁(intrinsic lock)。如果调用 wait() 的线程没有持有该对象的锁,将会抛出 IllegalMonitorStateException 异常。
接下来,我们分别实现 Sender 和 Receiver,并让它们实现 Runnable 接口,以便能被线程执行。
发送者(Sender)实现:
public class Sender implements Runnable {
private Data data;
// 标准构造函数
public void run() {
String packets[] = {
"第一个数据包",
"第二个数据包",
"第三个数据包",
"第四个数据包",
"End"
};
for (String packet : packets) {
data.send(packet);
// 使用 Thread.sleep() 模拟服务器端的耗时处理
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("线程被中断");
}
}
}
}
说明:
- 创建一个字符串数组
packets,模拟要发送的数据。 - 对每个数据包调用
send()。 - 使用
Thread.sleep()随机休眠,模拟服务器处理延迟。
接收者(Receiver)实现:
public class Receiver implements Runnable {
private Data load;
// 标准构造函数
public void run() {
for(String receivedMessage = load.receive();
!"End".equals(receivedMessage);
receivedMessage = load.receive()) {
System.out.println(receivedMessage);
// 使用 Thread.sleep() 模拟服务器端的耗时处理
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("线程被中断");
}
}
}
}
说明:
- 在循环中不断调用
load.receive(),直到收到"End"数据包为止。 - 每次接收到消息后打印,并模拟处理延迟。
主程序运行:
public static void main(String[] args) {
Data data = new Data();
Thread sender = new Thread(new Sender(data));
Thread receiver = new Thread(new Receiver(data));
sender.start();
receiver.start();
}
输出结果:
第一个数据包
第二个数据包
第三个数据包
第四个数据包
我们成功按顺序接收了所有数据包,并在发送者与接收者之间建立了正确的通信同步机制。
6. 结论
在本文中,我们讨论了 Java 中的一些核心同步概念,重点介绍了如何使用 wait() 和 notify() 解决典型的线程同步问题。最后,我们通过一个完整的代码示例,实践了这些概念。
在结束之前,值得指出的是:wait()、notify() 和 notifyAll() 这些底层 API 虽然有效,但现代 Java 编程中通常推荐使用更高级、更简洁的并发工具,例如 java.util.concurrent.locks 包中的 Lock 和 Condition 接口。