Jakob Jenkov 2014-08-08
信号量(Semaphore)是一种线程同步机制,可用于在线程之间发送信号以避免 信号丢失(missed signals),或者像使用 锁(lock) 那样保护 临界区(critical section)。
Java 5 在 java.util.concurrent 包中提供了信号量的实现,因此你无需自己实现。不过,了解其背后的原理和使用方式仍然是有益的。
Java 5 提供了内置的 Semaphore 类,你不需要自己实现。
简单信号量(Simple Semaphore)
以下是一个简单的 Semaphore 实现:
public class Semaphore {
private boolean signal = false;
public synchronized void take() {
this.signal = true;
this.notify();
}
public synchronized void release() throws InterruptedException {
while (!this.signal)
wait();
this.signal = false;
}
}
take()方法发送一个信号,并将其内部存储在Semaphore中。release()方法等待一个信号。一旦接收到信号,就清除标志位并退出release()方法。
通过这种方式使用信号量,可以避免信号丢失的问题。你会用 take() 替代 notify(),用 release() 替代 wait()。即使 take() 调用发生在 release() 之前,调用 release() 的线程仍能知道 take() 已被调用,因为信号被保存在内部变量 signal 中 —— 这与直接使用 wait() 和 notify() 不同。
注意:当你将信号量用于信号传递时,
take()和release()的命名可能显得有些奇怪。这些名称源自信号量作为锁的用途(见下文),在这种场景下命名更合理。
使用信号量进行线程间通信(Using Semaphores for Signaling)
以下是两个线程通过 Semaphore 相互通信的简化示例:
Semaphore semaphore = new Semaphore();
SendingThread sender = new SendingThread(semaphore);
ReceivingThread receiver = new ReceivingThread(semaphore);
receiver.start();
sender.start();
public class SendingThread {
Semaphore semaphore = null;
public SendingThread(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void run() {
while (true) {
// 执行某些操作,然后发送信号
this.semaphore.take();
}
}
}
public class ReceivingThread {
Semaphore semaphore = null;
public ReceivingThread(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void run() {
while (true) {
this.semaphore.release();
// 接收到信号,执行某些操作...
}
}
}
计数信号量(Counting Semaphore)
前面的 Semaphore 实现不会记录通过 take() 方法发送的信号数量。我们可以修改它以支持计数,这就是所谓的计数信号量(Counting Semaphore)。以下是简单实现:
public class CountingSemaphore {
private int signals = 0;
public synchronized void take() {
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException {
while (this.signals == 0)
wait();
this.signals--;
}
}
有界信号量(Bounded Semaphore)
上述 CountingSemaphore 对可存储的信号数量没有上限。我们可以为其添加一个上限,如下所示:
public class BoundedSemaphore {
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound) {
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException {
while (this.signals == bound)
wait();
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException {
while (this.signals == 0)
wait();
this.signals--;
this.notify();
}
}
注意:当信号数量达到上限时,take() 方法会阻塞,直到某个线程调用 release() 释放一个信号,才允许继续发送新信号。
将信号量用作锁(Using Semaphores as Locks)
你可以将有界信号量用作锁。方法是将上限设为 1,并用 take() 和 release() 包围临界区。例如:
BoundedSemaphore semaphore = new BoundedSemaphore(1);
...
semaphore.take();
try {
// 临界区
} finally {
semaphore.release();
}
与信号传递场景不同,在此用法中,take() 和 release() 是由同一个线程调用的。由于只允许一个线程获取信号量,其他调用 take() 的线程将被阻塞,直到该线程调用 release()。而 release() 永远不会阻塞,因为总是在 take() 之后调用。
提示:你也可以使用有界信号量来限制同时进入某段代码的线程数量。例如,如果将
BoundedSemaphore的上限设为 5,则最多允许 5 个线程同时进入临界区。但需确保这 5 个线程的操作不会相互冲突,否则程序可能出错。
release() 方法放在 finally 块中,是为了确保即使临界区抛出异常,也能正确释放信号量。