Jakob Jenkov 2014-06-23
什么是滑脱条件?
“滑脱条件”(Slipped Conditions)指的是这样一种情况:一个线程在检查某个条件之后、但在根据该条件采取行动之前,该条件已经被另一个线程修改了,导致第一个线程基于过期的条件执行操作,从而引发错误。
下面是一个简单的例子:
public class Lock {
private boolean isLocked = true;
public void lock(){
synchronized(this){
while(isLocked){
try{
this.wait();
} catch(InterruptedException e){
//do nothing, keep waiting
}
}
}
synchronized(this){
isLocked = true;
}
}
public synchronized void unlock(){
isLocked = false;
this.notify();
}
}
注意 lock() 方法中包含两个 synchronized 块。第一个块等待 isLocked 变为 false,第二个块将 isLocked 设置为 true,以锁定该 Lock 实例,防止其他线程进入。
假设当前 isLocked 为 false,此时有两个线程同时调用 lock()。如果第一个线程在进入第一个 synchronized 块并检查完 isLocked 为 false 后,被操作系统调度器中断(preempted),那么它会暂时挂起。此时第二个线程获得执行机会,并也进入第一个 synchronized 块,同样看到 isLocked 为 false。
于是,两个线程都读取到条件为 false,随后都会进入第二个 synchronized 块,将 isLocked 设为 true 并继续执行。
这就是“滑脱条件”的一个典型例子:两个线程都检查了条件,然后退出了同步块,这使得其他线程有机会在它们真正修改条件之前再次检查该条件。换句话说,从线程检查条件到实际修改条件之间,这个条件“滑脱”了。
为了避免滑脱条件,检查条件和设置条件的操作必须由同一个线程原子地完成,即在其他线程无法介入的时间窗口内完成。
上述示例的解决方案很简单:只需将 isLocked = true; 这一行移到第一个 synchronized 块中,紧接在 while 循环之后。修改后的代码如下:
public class Lock {
private boolean isLocked = true;
public void lock(){
synchronized(this){
while(isLocked){
try{
this.wait();
} catch(InterruptedException e){
//do nothing, keep waiting
}
}
isLocked = true;
}
}
public synchronized void unlock(){
isLocked = false;
this.notify();
}
}
现在,对 isLocked 条件的检查和设置都在同一个 synchronized 块中完成,保证了原子性。
一个更现实的例子
你可能会合理地指出:没有人会像上面那样实现一个锁,因此滑脱条件似乎只是一个理论问题。确实,第一个例子是为了清晰说明概念而简化设计的。
一个更现实的场景出现在实现公平锁(Fair Lock)时(参见 饥饿与公平性 一文)。如果我们参考 嵌套监视器锁死 一文中提到的“朴素实现”,并试图解决其中的嵌套监视器锁死问题,就很容易引入滑脱条件。
首先回顾一下嵌套监视器锁死问题中的公平锁实现:
// 存在嵌套监视器锁死问题的公平锁实现
public class FairLock {
private boolean isLocked = false;
private Thread lockingThread = null;
private List<QueueObject> waitingThreads = new ArrayList<QueueObject>();
public void lock() throws InterruptedException{
QueueObject queueObject = new QueueObject();
synchronized(this){
waitingThreads.add(queueObject);
while(isLocked || waitingThreads.get(0) != queueObject){
synchronized(queueObject){
try{
queueObject.wait();
} catch(InterruptedException e){
waitingThreads.remove(queueObject);
throw e;
}
}
}
waitingThreads.remove(queueObject);
isLocked = true;
lockingThread = Thread.currentThread();
}
}
public synchronized void unlock(){
if(this.lockingThread != Thread.currentThread()){
throw new IllegalMonitorStateException(
"Calling thread has not locked this lock");
}
isLocked = false;
lockingThread = null;
if(waitingThreads.size() > 0){
QueueObject queueObject = waitingThreads.get(0);
synchronized(queueObject){
queueObject.notify();
}
}
}
}
public class QueueObject {}
注意 synchronized(queueObject) 块及其内部的 queueObject.wait() 调用是嵌套在 synchronized(this) 块内部的,这就导致了嵌套监视器锁死问题。
为了解决这个问题,我们需要把 synchronized(queueObject) 块移到 synchronized(this) 块外面。修改后的代码可能如下所示:
// 存在滑脱条件问题的公平锁实现
public class FairLock {
private boolean isLocked = false;
private Thread lockingThread = null;
private List<QueueObject> waitingThreads = new ArrayList<QueueObject>();
public void lock() throws InterruptedException{
QueueObject queueObject = new QueueObject();
synchronized(this){
waitingThreads.add(queueObject);
}
boolean mustWait = true;
while(mustWait){
synchronized(this){
mustWait = isLocked || waitingThreads.get(0) != queueObject;
}
synchronized(queueObject){
if(mustWait){
try{
queueObject.wait();
} catch(InterruptedException e){
waitingThreads.remove(queueObject);
throw e;
}
}
}
}
synchronized(this){
waitingThreads.remove(queueObject);
isLocked = true;
lockingThread = Thread.currentThread();
}
}
}
注:这里只展示了
lock()方法,因为只有它被修改了。
现在 lock() 方法包含三个 synchronized 块:
- 第一个
synchronized(this)块将当前线程加入等待队列。 - 第二个
synchronized(this)块检查是否需要等待(mustWait = isLocked || ...)。 - 第三个
synchronized(queueObject)块根据mustWait决定是否调用wait()。 - 如果不需要等待,第四个
synchronized(this)块(在while循环之后)会获取锁。
问题来了:假设线程 A 和 B 同时调用 lock()。
- 线程 A 执行到第二个
synchronized(this)块,发现mustWait = true,然后进入synchronized(queueObject)块并准备wait()。 - 此时线程 A 被中断。
- 线程 B 执行同样的流程,也发现
mustWait = true,进入synchronized(queueObject)块。 - 接着,线程 A 被唤醒(比如因为其他线程释放了锁),成功获取锁,并很快又调用了
unlock()。 - 此时锁实际上是未锁定的。
- 但线程 B 的
mustWait仍然是true(因为它在锁被释放前就已经计算好了),于是它调用queueObject.wait()进入等待。 - 由于没有其他线程再调用
notify(),线程 B 将永久等待。
这就是滑脱条件:线程 B 在检查条件时,条件是“需要等待”;但当它真正执行等待操作时,条件已经改变(锁已释放),但它不知道,于是错误地进入了等待状态。
如何消除滑脱条件问题?
要解决上述问题,必须确保检查条件和修改条件的操作在同一个原子操作中完成。具体做法是:将最后一个 synchronized(this) 块中的逻辑(即获取锁的逻辑)上移到第一个检查条件的 synchronized(this) 块中。
修改后的代码如下:
// 无嵌套监视器锁死,但存在“信号丢失”问题的公平锁实现
public class FairLock {
private boolean isLocked = false;
private Thread lockingThread = null;
private List<QueueObject> waitingThreads = new ArrayList<QueueObject>();
public void lock() throws InterruptedException{
QueueObject queueObject = new QueueObject();
synchronized(this){
waitingThreads.add(queueObject);
}
boolean mustWait = true;
while(mustWait){
synchronized(this){
mustWait = isLocked || waitingThreads.get(0) != queueObject;
if(!mustWait){
waitingThreads.remove(queueObject);
isLocked = true;
lockingThread = Thread.currentThread();
return; // 优化:无需再进入 wait
}
}
synchronized(queueObject){
if(mustWait){
try{
queueObject.wait();
} catch(InterruptedException e){
waitingThreads.remove(queueObject);
throw e;
}
}
}
}
}
}
现在,mustWait 的判断和锁的获取(设置 isLocked = true 等)都在同一个 synchronized(this) 块中完成,保证了原子性。即使 while(mustWait) 循环在块外判断,但 mustWait 的值只在同步块内被修改,因此不会出现滑脱。
注意:
return语句不是必需的,只是一个优化——如果不需要等待,就没必要进入synchronized(queueObject)块。
但还有“信号丢失”问题
细心的读者会发现,上述实现仍然存在 “信号丢失”(missed signal)问题。
设想以下场景:
- 锁当前已被某个线程持有。
- 新线程调用
lock(),在第一个synchronized(this)块中加入队列。 - 然后它进入
while循环,mustWait被设为true。 - 此时该线程被中断。
- 持有锁的线程调用
unlock(),执行queueObject.notify()。 - 但此时等待线程尚未调用
queueObject.wait(),因此notify()调用“丢失”了。 - 随后,等待线程继续执行,调用
wait(),但由于没有后续的notify(),它将永远阻塞。
这就是“信号丢失”问题。
正因如此,在 饥饿与公平性 一文中展示的
FairLock实现,将QueueObject类改造成了一个信号量(semaphore),提供了doWait()和doNotify()方法。这两个方法内部会记录信号状态,即使doNotify()在doWait()之前调用,信号也不会丢失。