队列同步器AbstractQueuedSynchronizer是用来构建锁和其他同步工具的关键,我们熟知的ReentrantLock(有公平和非公平 两种实现),内部就是通过继承AQS来实现的,CountdownLatch也是通过继承AQS,来实现的,这个数据结构很重要,这篇文章主要就是 分析AQS相关的知识的。

  AQS类本身代码是比较多的,不算Doug Lea的注释,一共还有1200+行,但捋清楚结构后,也不算太复杂。整体框架,可以从下图中看出来,它维护一个int型的成员变量表示同步状态(代表共享资源),使用一个 FIFO队列来存放获取共享资源的线程(一个线程等待队列,多线程争用资源被阻塞时会进入该队列),本质上是一个双向链表。AQS中的 head节点指向同步队列的头节点,是获取同步状态成功的节点;tail节点指向同步队列的尾节点。head节点在释放同步队列时,会唤醒 后继节点,而后继几点如果成功获取到了同步状态后,也会将自己设置为新的head节点。 AttrH.png

  AQS提供了三个方法来访问同步状态:

  • 1、getState():获取当前同步状态
  • 2、setState(int newState):设置当前同步状态
  • 3、compareAndSetState(int expect, int update):使用CAS设置当前状态,可以保证状态设置的原子性

  AQS可以支持独占式的获取同步状态(只有一个线程能执行,ReentrantLock、ReentrantReadWriteLock中的WriteLock),也可以共享式的获取同步状态(可以多个线程同时执行,ReentrantLock中 的ReadLock、CountdownLatch以及Semaphore)。不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护 (如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • 1、protected boolean tryAcquire(int arg) 独占式的获取同步状态,实现该方法需要先查询当前状态是否符合预期,再使用CAS的方式设置同步状态。成功则返回true,否则返回false
  • 2、protected boolean tryRelease(int arg) 独占式的释放同步状态,阻塞在队列中的线程将有机会获取同步状态。成功返回true,失败返回false
  • 3、protected int tryAcquireShared(int arg) 共享式的获取同步状态,返回负数表示失败,0表示获取成功,但没有剩余资源,大于0的数表示成功且还有剩余资源。
  • 4、protected boolean tryReleaseShared(int arg) 共享式的释放同步状态,如果释放后允许唤醒后续等待结点返回true,否则返回false。
  • 5、protected boolean isHeldExclusively() 表示当前线程是否正在独占资源。只有用到Condition时才需要实现它。

  以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state是能回到零态的。

  再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所 有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

  一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现 独占和共享两种方式,如ReentrantReadWriteLock。

  在研究内部的方法之前,我们先看一下AQS的核心同步队列的相关实现。从上面的图里可以看出来,同步队列是由一个个节点Node构成的双向链表,Node是对每一个等待资源的线程的封装,其中包含了线程以及 线程的等待状态(如是否被阻塞,是否等待唤醒,是否已经被取消等),这个状态使用waitStatus表示,Node类的主要field如下:

  • 1、int waitStatus,等待状态,主要包含以下五种:
    • CANCELLED,值为1,表示在同步队列中的线程等待超时了,或者被中断了(能够响应中断的前提下—>acquireInterruptibly),就将当前节点的状态变更为取消状态,节点进入该状态后就不会再变化了;
    • SIGNAL,值为-1,表示它的next节点在等待当前节点唤醒,next节点入队时,会将它的前一个节点状态置为SIGNAL;当前节点如果释放了同步状态或者取消,将会通知next节点,让后面节点可以运行;
    • CONDITION,值为-2,节点在**Condition等待队列(不是同步队列)中,节点线程等待在Condition上,当其他线程对Condition调用了Signal()方法后,该节点会从等待队列中移入同步队列,加入 到对同步状态的获取中;
    • PROPAGATE,值为-3,表示共享模式下,下一次获取共享式同步状态将会无条件传播下去
    • INITIAL,值为0,新节点入队时的默认状态。 负数表示节点处于有效等待状态,正数表示节点已经被取消了。因此源码中都用>0 、<0来判断节点的状态是否正常。
  • 2、Node pre,前驱节点
  • 3、Node next,后继节点
  • 4、Node nextWaiter,Condition等待队列中的后继节点,不是同步队列。如果当前节点是共享的,那么这个字段是一个SHARED常 量,也就是节点类型和条件等待队列中的后继节点公用一个字段。
  • 5、Thread thread,获取同步状态的线程
  • 6、Node SHARED,标记该节点类型为共享
  • 7、Node EXCLUSIVE,标记该节点类型为独占

  这里注意到了,已经出现了两个不同的队列:同步队列和Condition等待队列,它们的实现基础都是Node节点,但是同步队列是双向的 ,而Condition等待队列是单向的,具体可以看ConditionObject这个内部类,它只有一个firstWaiter和一个lastWaiter,然后 通过nextWaiter来链接到一起。

  下面我们开始阅读源码部分。

  1、 acquire(int arg)

  这个方法是独占模式下获取同步状态的入口,需要注意的是该方法不会响应中断,也就是说如果线程由于获取同步状态失败而进入同步 队列中,另一个线程将该线程中断,这个线程并不会从队列中移出,就不会将自身节点的waitStatus状态修改为CANCELLED。如果获取 到资源,则线程直接返回,否则会进入同步队列,直到获取到了资源为止。源码如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这段代码主要的执行流程如下:

    1. 调用自己实现的tryAcquire(int arg)方法,尝试获取同步状态(注意公平锁和非公平锁的实现方式不太一样,可以对比代码 就能发现,非公平锁直接先CAS设置一次同步状态,即抢占;而公平锁表示如果队列中有等待的,就不会先做这次CAS操作),如果成功 就直接返回。
    1. 上一步调用失败了,调用addWaiter(Node.EXCLUSIVE)将该线程(构造为一个独占节点)加入同步队列的尾部
    1. acquireQueued(Node node, int arg)方法,使该节点以"死循环"的方式获取同步状态,获取不到则阻塞在同步队列中。
    1. 前面提到了这个方法并不响应中断,所以只有在获取资源以后,selfInterrupt()自我中断一次。

1.1 tryAcquire(int arg)

  这个方法就是各种同步器自己实现的了,自行决定能否重入,是否公平等,可以思考一下为什么这个方法没有声明为abstract。

1.2 addWaiter(Node node)

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

  注意我们调用的时候传的node是一个Node.EXCLUSIVE,这表明当前新构建出来的节点是独占模式,isShared()返回false。 首先判断如果tail不为空,则代表队列也非空,则先尝试快速添加的方式,一次CAS操作将node添加到队尾,这里你可以思考一下,为 什么node.prev = pred; 这一步是放在CAS之前做,而不是放在CAS成功后再做?我猜是尽量if语句里面代码的执行时间,避免另一 个线程的CAS一直失败,此处存疑;如果tail为空(表示队列为空),或者上一步CAS失败了,在进入enq(Node node)方法,直接自旋 式的添加,直到节点添加成功到队尾,线程才返回。这里值得注意的是,如果队列为空的做法,是强制生成了一个标识节点,头尾都指向 这个节点,我猜是为了利用此方法compareAndSetTail,这个方法要求传递的expect的tail不为空;作为对比,你可以看 compareAndSetHead()方法传递的expect的head是一直为空的,此处好好思考一下为什么?因为compareAndSetHead()方法只在enq这里调用过一次,目的是在队列为空时构造出一个标识节点作为头节点, 其他时候设置头节点只会由获取同步状态成功的线程执行,由于只有一个线程能够获取到同步状态,因此设置头节点的方法是不需要CAS操作的。

1.3 final boolean acquireQueued(final Node node, int arg)

  通过上面的步骤后,线程获取同步状态失败了,也将自己添加到同步队列的队尾了。下面又开始了一个"自旋"的过程,每个节点(线程)都开始观察,等待头节点释放同步状态,如果自己获取到了同步状态,就结束 这个自旋的过程,否则一直这个节点的线程会一直阻塞。注意到现在为止,一个线程执行acquire(int arg)方法还一直阻塞着,代码如下:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;                 //标记是否成功拿到资源
    try {
        boolean interrupted = false;       //标记在等待的过程中是否被中断过
        for (;;) {
            final Node p = node.predecessor();      //拿前驱节点
            if (p == head && tryAcquire(arg)) {     //如果前驱是head才能尝试去获取同步状态
                setHead(node);                      //这里简单将自己设置为头节点
                p.next = null; // help GC           //将原来头节点的next置为null,原有的头节点已经没用了(已经出队了),可以被GC回收掉
                failed = false;                     //成功获取资源
                return interrupted;                 //返回在等待资源的过程中是否被中断
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;           //在等待过程中是否被中断过
        }
    } finally {
        if (failed)                   //如果没有获取到同步状态(可能是超时or被中断),取消节点在队列中的等待。
            cancelAcquire(node);
    }
}

  setHead()方法只会在下面这种情况下调用:当前节点的前驱是头节点,也就是,只有这种节点才能尝试获取同步状态,原因如下:

    1. 头节点是成功获取到同步状态的节点,当它释放同步状态后,会唤醒后继节点,后继节点被唤醒后还需要**检查自己的前驱节点是否是头节点;
    1. 维护队列FIFO的原则,

  我们再来看如果前驱节点不是头节点的情况,此时不能去尝试获取同步状态。先来看shouldParkAfterFailedAcquire(Node pred, Node node)方法,这个方法主要用来检查节点(准确来说是线程) 是否可以休息。整个流程中,如果前驱节点的状态不是SIGNAL,则会返回false,表示并不能休息。

1.3.1 boolean shouldParkAfterFailedAcquire(Node pred, Node node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release to signal it, so it can safely park.
         * 如果前驱节点的状态已经是SIGNAL了,代表前驱节点释放同步状态后会通知我自己,那我自己就可以休息了
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and indicate retry.
         * 如果前驱节点已经放弃了,那么就放弃前驱节点了,继续前移,直到pred节点状态为非CANCELLED,排在它的后面
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we need a signal, but don't park yet.  Caller will need to retry to 
         * make sure it cannot acquire before parking.
         * 如果前驱节点的状态是正常的(处于剩下其他状态),尝试以CAS的方式将前驱节点的状态设置成SIGNAL,让前驱节点可以通知自己
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

1.3.2 final boolean parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);                 //调用park()方法,使得线程进入waiting状态(到这里线程才真正waiting了,),这里可以用一篇文章梳理一下LockSupport类的相关知识
    return Thread.interrupted();            //执行到这里说明线程又开始run了,查看一下自己是否是被中断的       
}

  park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。

  1.3 再次总结一下acquireQueued的流程

  首先,节点进入队尾,检测前驱节点是否是head,是就尝试获取同步状态;否则就尝试将前驱节点的状态修改为SIGNAL,设置成功后,线程便调用park进入waiting状态,等待unpark()或者interrupt() 唤醒自己;被唤醒后,如果能够拿到同步状态,则将头节点设置为自己,并返回从入队列到获得同步状态整个过程中是否被中断过,如果没有拿到则继续继续检查前驱节点状态,一直循环下去。

  再次回到acquire(int arg)的代码,总结一下它的调用流程(可以参考《Java 并发编程的艺术》书中的流程图):

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
    1. 调用自定义同步器的tryAcquire(int arg)方法,尝试获取同步状态,成功则直接返回;
    1. 如果没成功,则调用addWaiter()方法将当前线程包装为Node,加入同步队列的尾部,并标记为独占模式;
    1. 节点进入队尾后,acquire()方法让线程在队列中休息,线程进入waiting状态,被unpark的时候会被唤醒,当其前驱节点是头节点时才能尝试获取同步状态,获取到同步状态后线程返回,如果在整个 等待过程中被中断过,则返回true,否则返回false;
    1. 如果在整个等待过程中线程被中断过,这个方法是忽略中断的,被中断的线程在队列中也不会修改其waitStatus状态,最后在线程被unpark的时候,我们是调用的Thread.interrupted()方法来返回 线程是否被中断过,注意这个方法会清除中断状态。所以如果是true的话,最后调用了一次selfInterrupt()方法来将自己的中断状态补上,又置为true。

  ReentrantLock.lock()的流程就是acquire(1),内部还提供了公平和非公平的两种实现,可以参考。

  2、 release(int arg)

  前面将了acquire的流程,现在看一下它的反向操作,释放一个独占式的同步状态,如果释放成功,则会唤醒等待队列中的下一个节点,让其尝试获取同步状态。整个流程并不复杂,大体上是根据各自同步器 实现的tryRelease()方法的返回值来判断同步状态是否得到释放了,代码如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

2.1 tryRelease(int arg)

  这个方法和tryAcquire(int arg)一样,也是需要同步器自己实现的。一般来说,tryRelease都会成功,因为在独占模式下,来释放资源的线程一定之前拿到了独占资源,直接减掉释放的资源量即可 (state -= arg),注意返回值即可。

2.2 unparkSuccessor(Node node)

  这个方法用来唤醒等待队列中的下一个节点(线程),代码如下:

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try to clear in anticipation of signalling.  It is OK if this fails 
     * or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);         //将当前节点(也就是头节点)的waitStatus置为0,允许失败?那不做这一步可以吗?

    /*
     * Thread to unpark is held in successor, which is normally just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

  有了前面的基础知识,我们发现这个函数一点也不复杂,归纳为一句话就是:找到head节点后面第一个满足条件的(未被取消的)节点线程,唤醒它。在acquire中因为park阻塞住的线程就可以继续运行了。 这里其实可以思考这样一个问题,如果你的代码在release()的时候抛异常了,那么永远也不会unpark,之前进入waiting状态的线程将永远停止在那儿。

  3、 acquireShared(int arg)

  看完独占式的获取和释放同步资源以后,我们再来研究一下共享式的获取同步资源,也就是同步状态在同一时刻允许被多个线程同时获取。它会获取指定量的资源,获取成功则直接返回,获取失败 则进入等待队列,直到获取到资源为止,整个过程一样忽略中断,代码如下:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

  这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以 去获取。所以这里acquireShared()的流程就是:

    1. tryAcquireShared()尝试获取资源,成功则直接返回;
    1. 失败则通过doAcquireShared()进入同步队列,直到获取到资源为止才返回。

    private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

  可以看出来,共享式的放入同步队列和独占式的acquireQueued方法代码结构都很相似,都是自旋式的等待头节点释放同步状态。不同之处在于尝试获取同步状态时,共享式的返回的是数字(代表可用的资源数) ,并且将selfInterrupt放在了这个方法里面,实际效果和之前独占式的没有什么区别。

  如果当前节点的前驱节点是头节点时,尝试去获取同步状态。如果获取的同步状态个数(可用资源数目)不小于0的话,还会唤醒之后的 节点,也就是方法setHeadAndPropagate,下面是这个方法的源码。这儿需要注意的是,如果head节点释放的资源数目为5个,而它的 后继节点需要的同步资源为6个,后继的后继需要的资源为2个,那么后继的后继是不会被唤醒的,仍然会先满足上一个后继节点的需求, 从而两个后继节点都会等待。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

  获取到了同步状态后,将头节点设置为自己,这些还是一样的流程,只有这个propagate的地方不一样,即如果还有可用的资源,会去 唤醒后续的共享节点,调用doReleaseShared()方法,如下:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

  4、 releaseShared(int arg)

  共享式的获取同步资源以后,现在进入共享式的释放同步资源。如果释放成功,本质上就是调用一个上面提到过的方法doReleaseShared(),就会唤醒队列里的后续等待节点。和独占式的也很相似,只不过独 占式的是唤醒head的后继节点,这个共享式的唤醒可能唤醒一堆…

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {     //尝试释放资源
        doReleaseShared();           //唤醒后续节点
        return true;
    }
    return false;
}