CountDownLatch源码解读
之前写过一篇关于AQS的源码分析的文章,介绍了AQS是Java并发组件的基础,基础的介绍了,我们再看看它的应用,因此从这一篇开始我将依次介绍下AQS在Java各个并发组件如何应用的,先从CountDownLatch
开始。
CountDownLatch
,有时也叫闭锁,它的作用就像一个大门,初始是关着的,线程在“门外”等候通过,在某一时刻大门打开,线程一起通过,大门打开后一直会保持打开状态,不会再关闭。它用在什么场景呢?一个典型的场景是JVM的warmup,我们都知道JVM刚启动时,由于JIT还没介入,系统的响应是比较慢的,需要发送一些请求让JVM快速“热”起来,即所谓的warmup。这个过程需要确保warmup的请求处理完,才能去接线上的正常请求,这个场景就可以用CountDownLatch
来实现。也就是CountDownLatch
源码中抽象出的代码示例,
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
可以看到上面的源码有两个闭锁,一个用来开始,一个用来结束,N个线程执行某种任务,比如发送warmup请求,startSignal
可以确保这些请求一起发送,请求有的响应快,有的响应慢,doneSignal
可以确保那些快的慢的一起结束。那么它是怎么做到的呢?我们一起看下源码。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CountDownLatch
里有一个Sync
类,看懂这个类,其实闭锁的原理就懂了。
还记得之前AQS那篇文章里说:
AQS其实就是一个由状态变量和CLH虚拟队列组成的一个基础并发组件,它维护了一套线程阻塞、排队、唤醒的机制。它可以工作在共享和非共享两种模式下,共享模式下允许多个线程一起运行,非共享模式只允许一个线程运行。
count
就是这个状态变量,我们知道,AQS的tryAcquireShared
如果返回正数表示当前线程可以执行,并且后续其他线程也可以继续执行;如果返回0,表示当前线程可以执行,但是后续线程就好好的待着吧;如果返回负数,连当前线程也不能继续执行了,会进入休眠状态等待唤醒,tryReleaseShared
方法如果成功会唤醒休眠中的线程,看起来闭环了。
因为闭锁可能需要唤醒多个线程,因此AQS工作在共享模式下。Sync
重写了tryAcquireShared
,使得count
为0时,大门已开,畅通无阻,当前及后续线程继续执行,否则当前线程进入休眠,等待大门打开的那一刻,同时重写了tryReleaseShared
使得下一个count
为0时才能唤醒休眠的线程,也就是唤醒只发生在count
从1到0发生变化的那一刻,在这之前和在这之后都不会唤醒线程,在这之前是count
还没数到0,大门是关闭的,在这之后是count
已经数到0了,大门已经开了,并且一直是打开的,不会关闭。
下面是CountDownLatch
的两个核心方法await
(一个不带超时,一个带超时)和countDown
,前者用来拦住某些线程,后者倒计时控制闭锁打开,里面的核心就是上面介绍的两个方法,不再赘述。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}