Jakob Jenkov 2021-03-11
读写锁(Read/Write Lock)比我们在《Java 中的锁(Locks in Java)》一文中介绍的 Lock 实现更为复杂。假设你有一个应用程序,它会读取和写入某个资源,但写操作远不如读操作频繁。多个线程同时读取同一资源不会相互干扰,因此允许多个希望读取该资源的线程同时获得访问权限。然而,如果有一个线程想要写入该资源,则此时不能有任何其他读或写操作正在进行。
为了解决“允许多个读者、仅允许一个写者”的问题,我们需要使用读写锁。
Java 5 在 java.util.concurrent 包中提供了读写锁的实现。即便如此,了解其背后的实现原理仍然很有价值。
Java 中读写锁的实现
首先,我们总结一下获取读/写资源访问权限的条件:
| 读访问 | 如果当前没有线程正在写,并且也没有线程请求写访问。 |
|---|---|
| 写访问 | 如果当前没有线程在读,也没有线程在写。 |
如果一个线程想要读取资源,只要没有线程正在写入,并且也没有线程已请求写访问,那么它是可以被允许的。我们优先处理写访问请求,是因为我们认为写请求比读请求更重要。此外,如果读操作最为频繁,而我们不优先处理写请求,就可能导致饥饿(starvation):写请求线程会被阻塞,直到所有读线程释放读写锁。如果不断有新线程获得读访问权限,那么等待写访问的线程将永远无法获得执行机会,从而陷入饥饿状态。因此,只有当没有线程持有写锁,也没有线程请求写锁时,才允许读访问。
对于写访问,只要没有线程正在读或写,就可以授予写权限。有多少线程在排队请求写访问并不重要(除非你想保证写请求之间的公平性)。
基于上述简单规则,我们可以实现如下 ReadWriteLock:
public class ReadWriteLock {
private int readers = 0;
private int writers = 0;
private int writeRequests = 0;
public synchronized void lockRead() throws InterruptedException {
while (writers > 0 || writeRequests > 0) {
wait();
}
readers++;
}
public synchronized void unlockRead() {
readers--;
notifyAll();
}
public synchronized void lockWrite() throws InterruptedException {
writeRequests++;
while (readers > 0 || writers > 0) {
wait();
}
writeRequests--;
writers++;
}
public synchronized void unlockWrite() throws InterruptedException {
writers--;
notifyAll();
}
}
这个 ReadWriteLock 提供了两个加锁方法和两个解锁方法:一个用于读访问,一个用于写访问。
lockRead()方法实现了读访问规则:只要没有写者或写请求,就允许读。lockWrite()方法实现了写访问规则:线程先增加写请求计数,然后检查是否可以真正获得写权限(即没有读者和写者)。写请求的数量不影响判断。
值得注意的是,unlockRead() 和 unlockWrite() 都调用了 notifyAll() 而非 notify()。原因如下:
假设有多个线程在等待读访问,也有线程在等待写访问。如果只调用 notify(),可能唤醒的是一个读线程;但由于存在写请求,该读线程仍需继续等待。而写线程未被唤醒,系统就会陷入僵局——没有任何线程能获得访问权限。使用 notifyAll() 可以唤醒所有等待线程,让它们重新检查自己是否满足访问条件。
此外,notifyAll() 还有一个优势:当多个读线程在等待,且没有写请求时,一旦写锁被释放(unlockWrite()),所有等待的读线程可以同时获得读访问权限,而不是逐个唤醒。
读写锁的可重入性(Reentrance)
上面实现的 ReadWriteLock 不是可重入的。例如:
- 如果一个已经持有写锁的线程再次请求写锁,它会被阻塞,因为
writers > 0。 - 更严重的情况:
- 线程1获得读锁;
- 线程2请求写锁(被阻塞,因为有读者);
- 线程1再次请求读锁(重入),但也会被阻塞,因为存在写请求。
这会导致死锁式僵局:所有后续读/写请求都无法进展。
为了解决这个问题,我们需要分别实现读可重入和写可重入。
读可重入(Read Reentrance)
规则:如果一个线程已经拥有读访问权限,即使有写请求,也应允许它再次获取读锁。
为此,我们使用一个 Map<Thread, Integer> 来记录每个已获得读锁的线程及其重入次数。
修改后的代码:
public class ReadWriteLock {
private Map<Thread, Integer> readingThreads = new HashMap<>();
private int writers = 0;
private int writeRequests = 0;
public synchronized void lockRead() throws InterruptedException {
Thread callingThread = Thread.currentThread();
while (!canGrantReadAccess(callingThread)) {
wait();
}
readingThreads.put(callingThread, getAccessCount(callingThread) + 1);
}
public synchronized void unlockRead() {
Thread callingThread = Thread.currentThread();
int accessCount = getAccessCount(callingThread);
if (accessCount == 1) {
readingThreads.remove(callingThread);
} else {
readingThreads.put(callingThread, accessCount - 1);
}
notifyAll();
}
private boolean canGrantReadAccess(Thread callingThread) {
if (writers > 0) return false;
if (isReader(callingThread)) return true; // 已是读者,允许重入
if (writeRequests > 0) return false;
return true;
}
private int getAccessCount(Thread callingThread) {
Integer accessCount = readingThreads.get(callingThread);
return accessCount == null ? 0 : accessCount.intValue();
}
private boolean isReader(Thread callingThread) {
return readingThreads.get(callingThread) != null;
}
}
写可重入(Write Reentrance)
规则:只有已经持有写锁的线程才能再次获取写锁。
实现方式:记录当前持有写锁的线程(writingThread)和写锁重入次数(writeAccesses)。
public class ReadWriteLock {
private Map<Thread, Integer> readingThreads = new HashMap<>();
private int writeAccesses = 0;
private int writeRequests = 0;
private Thread writingThread = null;
public synchronized void lockWrite() throws InterruptedException {
writeRequests++;
Thread callingThread = Thread.currentThread();
while (!canGrantWriteAccess(callingThread)) {
wait();
}
writeRequests--;
writeAccesses++;
writingThread = callingThread;
}
public synchronized void unlockWrite() throws InterruptedException {
writeAccesses--;
if (writeAccesses == 0) {
writingThread = null;
}
notifyAll();
}
private boolean canGrantWriteAccess(Thread callingThread) {
if (hasReaders()) return false;
if (writingThread == null) return true;
if (!isWriter(callingThread)) return false; // 不是当前写者,拒绝
return true; // 是当前写者,允许重入
}
private boolean hasReaders() {
return !readingThreads.isEmpty();
}
private boolean isWriter(Thread callingThread) {
return writingThread == callingThread;
}
}
从读升级到写(Read to Write Reentrance)
有时,一个已持有读锁的线程需要升级为写锁。这只有在它是唯一的读者时才安全。
修改 canGrantWriteAccess():
private boolean canGrantWriteAccess(Thread callingThread) {
if (isOnlyReader(callingThread)) return true; // 唯一读者,允许升级
if (hasReaders()) return false;
if (writingThread == null) return true;
if (!isWriter(callingThread)) return false;
return true;
}
private boolean isOnlyReader(Thread thread) {
return readingThreads.size() == 1 && readingThreads.containsKey(thread);
}
从写降级到读(Write to Read Reentrance)
持有写锁的线程总是可以安全地获取读锁(因为此时没有其他线程能访问资源)。
修改 canGrantReadAccess():
private boolean canGrantReadAccess(Thread callingThread) {
if (isWriter(callingThread)) return true; // 写者可直接读
if (writingThread != null) return false;
if (isReader(callingThread)) return true;
if (writeRequests > 0) return false;
return true;
}
完全可重入的 ReadWriteLock 实现
以下是整合了所有可重入特性的完整实现(并做了代码结构优化以提高可读性):
public class ReadWriteLock {
private Map<Thread, Integer> readingThreads = new HashMap<>();
private int writeAccesses = 0;
private int writeRequests = 0;
private Thread writingThread = null;
public synchronized void lockRead() throws InterruptedException {
Thread callingThread = Thread.currentThread();
while (!canGrantReadAccess(callingThread)) {
wait();
}
readingThreads.put(callingThread, getReadAccessCount(callingThread) + 1);
}
private boolean canGrantReadAccess(Thread callingThread) {
if (isWriter(callingThread)) return true;
if (hasWriter()) return false;
if (isReader(callingThread)) return true;
if (hasWriteRequests()) return false;
return true;
}
public synchronized void unlockRead() {
Thread callingThread = Thread.currentThread();
if (!isReader(callingThread)) {
throw new IllegalMonitorStateException(
"Calling Thread does not hold a read lock on this ReadWriteLock");
}
int accessCount = getReadAccessCount(callingThread);
if (accessCount == 1) {
readingThreads.remove(callingThread);
} else {
readingThreads.put(callingThread, accessCount - 1);
}
notifyAll();
}
public synchronized void lockWrite() throws InterruptedException {
writeRequests++;
Thread callingThread = Thread.currentThread();
while (!canGrantWriteAccess(callingThread)) {
wait();
}
writeRequests--;
writeAccesses++;
writingThread = callingThread;
}
public synchronized void unlockWrite() throws InterruptedException {
if (!isWriter(Thread.currentThread())) {
throw new IllegalMonitorStateException(
"Calling Thread does not hold the write lock on this ReadWriteLock");
}
writeAccesses--;
if (writeAccesses == 0) {
writingThread = null;
}
notifyAll();
}
private boolean canGrantWriteAccess(Thread callingThread) {
if (isOnlyReader(callingThread)) return true;
if (hasReaders()) return false;
if (writingThread == null) return true;
if (!isWriter(callingThread)) return false;
return true;
}
// 辅助方法
private int getReadAccessCount(Thread callingThread) {
Integer accessCount = readingThreads.get(callingThread);
return accessCount == null ? 0 : accessCount.intValue();
}
private boolean hasReaders() { return !readingThreads.isEmpty(); }
private boolean isReader(Thread callingThread) { return readingThreads.containsKey(callingThread); }
private boolean isOnlyReader(Thread callingThread) {
return readingThreads.size() == 1 && isReader(callingThread);
}
private boolean hasWriter() { return writingThread != null; }
private boolean isWriter(Thread callingThread) { return writingThread == callingThread; }
private boolean hasWriteRequests() { return writeRequests > 0; }
}
在 finally 子句中调用 unlock()
当使用 ReadWriteLock 保护可能抛出异常的临界区时,必须在 finally 块中调用 unlockRead() 或 unlockWrite(),以确保锁一定会被释放:
lock.lockWrite();
try {
// 临界区代码,可能抛出异常
} finally {
lock.unlockWrite();
}
如果不这样做,一旦临界区抛出异常,锁将永远无法释放,导致所有后续尝试获取锁的线程永久阻塞。即使锁是可重入的,也依赖于最初持有锁的线程最终成功释放它——但这不应作为可靠保障。使用 finally 块是更健壮的做法。