之前写过一篇关于AQS的源码分析的文章,介绍了AQS是Java并发组件的基础,基础的介绍了,我们再看看它的应用,因此从这一篇开始我将依次介绍下AQS在Java各个并发组件如何应用的,先从CountDownLatch开始。

CountDownLatch,有时也叫闭锁,它的作用就像一个大门,初始是关着的,线程在“门外”等候通过,在某一时刻大门打开,线程一起通过,大门打开后一直会保持打开状态,不会再关闭。它用在什么场景呢?一个典型的场景是JVM的warmup,我们都知道JVM刚启动时,由于JIT还没介入,系统的响应是比较慢的,需要发送一些请求让JVM快速“热”起来,即所谓的warmup。这个过程需要确保warmup的请求处理完,才能去接线上的正常请求,这个场景就可以用CountDownLatch来实现。也就是CountDownLatch源码中抽象出的代码示例,

 class Driver { // ...
   void main() throws InterruptedException {
     CountDownLatch startSignal = new CountDownLatch(1);
     CountDownLatch doneSignal = new CountDownLatch(N);

     for (int i = 0; i < N; ++i) // create and start threads
       new Thread(new Worker(startSignal, doneSignal)).start();

     doSomethingElse();            // don't let run yet
     startSignal.countDown();      // let all threads proceed
     doSomethingElse();
     doneSignal.await();           // wait for all to finish
   }
 }

 class Worker implements Runnable {
   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;
   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
   }
   public void run() {
     try {
       startSignal.await();
       doWork();
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

可以看到上面的源码有两个闭锁,一个用来开始,一个用来结束,N个线程执行某种任务,比如发送warmup请求,startSignal可以确保这些请求一起发送,请求有的响应快,有的响应慢,doneSignal可以确保那些快的慢的一起结束。那么它是怎么做到的呢?我们一起看下源码。

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

CountDownLatch里有一个Sync类,看懂这个类,其实闭锁的原理就懂了。

还记得之前AQS那篇文章里说:

AQS其实就是一个由状态变量和CLH虚拟队列组成的一个基础并发组件,它维护了一套线程阻塞、排队、唤醒的机制。它可以工作在共享和非共享两种模式下,共享模式下允许多个线程一起运行,非共享模式只允许一个线程运行。

count就是这个状态变量,我们知道,AQS的tryAcquireShared如果返回正数表示当前线程可以执行,并且后续其他线程也可以继续执行;如果返回0,表示当前线程可以执行,但是后续线程就好好的待着吧;如果返回负数,连当前线程也不能继续执行了,会进入休眠状态等待唤醒,tryReleaseShared方法如果成功会唤醒休眠中的线程,看起来闭环了。

因为闭锁可能需要唤醒多个线程,因此AQS工作在共享模式下Sync重写了tryAcquireShared,使得count为0时,大门已开,畅通无阻,当前及后续线程继续执行,否则当前线程进入休眠,等待大门打开的那一刻,同时重写了tryReleaseShared使得下一个count为0时才能唤醒休眠的线程,也就是唤醒只发生在count从1到0发生变化的那一刻,在这之前和在这之后都不会唤醒线程,在这之前是count还没数到0,大门是关闭的,在这之后是count已经数到0了,大门已经开了,并且一直是打开的,不会关闭。

下面是CountDownLatch的两个核心方法await(一个不带超时,一个带超时)和countDown,前者用来拦住某些线程,后者倒计时控制闭锁打开,里面的核心就是上面介绍的两个方法,不再赘述。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
    sync.releaseShared(1);
}