队列同步器 AbstractQueuedSynchronizer(以下简称 AQS),是用来构建锁或者其他同步组件的基础框架。它使用一个 int 成员变量来表示同步状态,通过 CAS 操作对同步状态进行修改,确保状态的改变是安全的。通过内置的 FIFO (First In First Out)队列来完成资源获取线程的排队工作。更多关于 Java 多线程的文章可以转到 这里
publicstaticvoidwithoutMutex()throws InterruptedException { System.out.println("Without mutex: "); int threadCount = 2; final Thread threads[] = new Thread[threadCount]; for (int i = 0; i < threads.length; i++) { finalint index = i; threads[i] = new Thread(new Runnable() { @Override publicvoidrun(){ for (int j = 0; j < 100000; j++) { if (j % 20000 == 0) { System.out.println("Thread-" + index + ": j =" + j); } } } }); }
for (int i = 0; i < threads.length; i++) { threads[i].start(); } for (int i = 0; i < threads.length; i++) { threads[i].join(); } }
publicstaticvoidwithMutex(){ System.out.println("With mutex: "); final Mutex mutex = new Mutex(); int threadCount = 2; final Thread threads[] = new Thread[threadCount]; for (int i = 0; i < threads.length; i++) { finalint index = i; threads[i] = new Thread(new Runnable() {
下面在看一个共享锁的示例。在该示例中,我们定义两个共享资源,即同一时间内允许两个线程同时执行。我们将同步变量的初始状态 state 设为 2,当一个线程获取了共享锁之后,将 state 减 1,线程释放了共享锁后,将 state 加 1。状态的合法范围是 0、1 和 2,其中 0 表示已经资源已经用光了,此时线程再要获得共享锁就需要进入同步序列等待。下面是具体实现:
publicSync(int resourceCount){ if (resourceCount <= 0) { thrownew IllegalArgumentException("resourceCount must be larger than zero."); } // 设置可以共享的资源总数 setState(resourceCount); }
@Override protectedinttryAcquireShared(int reduceCount){ // 使用尝试获得资源,如果成功修改了状态变量(获得了资源) // 或者资源的总量小于 0(没有资源了),则返回。 for (; ; ) { int lastCount = getState(); int newCount = lastCount - reduceCount; if (newCount < 0 || compareAndSetState(lastCount, newCount)) { return newCount; } } }
@Override protectedbooleantryReleaseShared(int returnCount){ // 释放共享资源,因为可能有多个线程同时执行,所以需要使用 CAS 操作来修改资源总数。 for (; ; ) { int lastCount = getState(); int newCount = lastCount + returnCount; if (compareAndSetState(lastCount, newCount)) { returntrue; } } } }
// 定义两个共享资源,说明同一时间内可以有两个线程同时运行 privatefinal Sync sync = new Sync(2);
在上面的流程中,其实涉及到了两个操作,比较以及替换,为了确保程序正确,需要确保这两个操作的原子性(也就是说确保这两个操作同时进行,中间不会有其他线程干扰)。现在的 CPU 中,提供了相关的底层 CAS 指令,即 CPU 底层指令确保了比较和交换两个操作作为一个原子操作进行(其实在这一点上还是有排他锁的. 只是比起用synchronized, 这里的排他时间要短的多.),Java 中的 CAS 函数是借助于底层的 CAS 指令来实现的。更多关于 CPU 底层实现的原理可以参考 这篇文章。我们来看下 Java 中对于 CAS 函数的定义:
/** * Atomically update Java variable to x if it is currently * holding expected. * @return true if successful */ publicfinalnativebooleancompareAndSwapObject(Object o, long offset, Object expected, Object x);
/** * Atomically update Java variable to x if it is currently * holding expected. * @return true if successful */ publicfinalnativebooleancompareAndSwapInt(Object o, long offset, int expected, int x);
/** * Atomically update Java variable to x if it is currently * holding expected. * @return true if successful */ publicfinalnativebooleancompareAndSwapLong(Object o, long offset, long expected, long x);
上面三个函数定义在 sun.misc.Unsafe 类中,使用该类可以进行一些底层的操作,例如直接操作原生内存,更多关于 Unsafe 类的文章可以参考 这篇。以 compareAndSwapInt 为例,我们看下如何使用 CAS 函数:
/** * Report the location of a given static field, in conjunction with {@link * #staticFieldBase}. * Do not expect to perform any sort of arithmetic on this offset; * it is just a cookie which is passed to the unsafe heap memory accessors. * * Any given field will always have the same offset, and no two distinct * fields of the same class will ever have the same offset. * * As of 1.4.1, offsets for fields are represented as long values, * although the Sun JVM does not use the most significant 32 bits. * It is hard to imagine a JVM technology which needs more than * a few bits to encode an offset within a non-array object, * However, for consistency with other methods in this class, * this method reports its result as a long value. */ publicnativelongobjectFieldOffset(Field f);
下面我们再看一下 compareAndSwapInt 的函数原型。我们知道 CAS 操作需要知道 3 个信息:内存中的值,期望的旧值以及要修改的新值。通过前面的分析,我们知道通过 o 和 offset 我们可以确定属性在内存中的地址,也就是知道了属性在内存中的值。expected 对应期望的旧址,而 x 就是要修改的新值。
1
publicfinalnativebooleancompareAndSwapInt(Object o, long offset, int expected, int x);
compareAndSwapInt 函数首先比较一下 expected 是否和内存中的值相同,如果不同证明其他线程修改了属性值,那么就不会执行更新操作,但是程序如果就此返回了,似乎不太符合我们的期望,我们是希望程序可以执行更新操作的,如果其他线程先进行了更新,那么就在更新后的值的基础上进行修改,所以我们一般使用循环配合 CAS 函数,使程序在更新操作完成之后再返回,如下所示:
1 2 3 4
long before = counter; while (!unsafe.compareAndSwapLong(this, offset, before, before + 1)) { before = counter; }
publicvoidincrement(){ long before = counter; while (!unsafe.compareAndSwapLong(this, offset, before, before + 1)) { before = counter; } }
publiclonggetCounter(){ return counter; }
privatestaticlong intCounter = 0;
publicstaticvoidmain(String[] args)throws InterruptedException { int threadCount = 10; Thread threads[] = new Thread[threadCount]; final CASCounter casCounter = new CASCounter();
for (int i = 0; i < threadCount; i++) { threads[i] = new Thread(new Runnable() { @Override publicvoidrun(){
for (int i = 0; i < 10000; i++) { casCounter.increment(); intCounter++; } } }); threads[i].start(); }
for(int i = 0; i < threadCount; i++) { threads[i].join(); } System.out.printf("CASCounter is %d \nintCounter is %d\n", casCounter.getCounter(), intCounter); } }
PROPAGATE: -3,在共享模式下,可以认为资源有多个,因此当前线程被唤醒之后,可能还有剩余的资源可以唤醒其他线程。该状态用来表明后续节点会传播唤醒的操作。需要注意的是只有头节点才可以设置为该状态(This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.)。
privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ // 当前节点的前继节点的等待状态 int ws = pred.waitStatus; // 如果前继节点的等待状态为 SIGNAL 我们就可以将当前节点对应的线程挂起 if (ws == Node.SIGNAL) returntrue; if (ws > 0) { // ws 大于 0,表明当前线程的前继节点处于 CANCELED 的状态, // 所以我们需要从当前节点开始往前查找,直到找到第一个不为 // CAECELED 状态的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
int ws = node.waitStatus; // 将 head 节点的状态置为 0,表明当前节点的后续节点已经被唤醒了, // 不需要再次唤醒,修改 ws 状态主要作用于 release 的判断 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
privatevoiddoReleaseShared(){ /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; // head = null 说明没有初始化,head = tail 说明同步队列中没有等待节点 if (h != null && h != tail) { // 查看当前节点的等待状态 int ws = h.waitStatus; // 我们在前面说过,SIGNAL说明有后续节点需要唤醒 if (ws == Node.SIGNAL) {
/* * 将当前节点的值设为 0,表明已经唤醒了后继节点 * 可能会有多个线程同时执行到这一步,所以使用 CAS 保证只有一个线程能修改成功, * 从而执行 unparkSuccessor,其他的线程会执行 continue 操作 */ if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } elseif (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) { /* * ws 等于 0,说明无需唤醒后继结点(后续节点已经被唤醒或者当前节点没有被阻塞的后继结点), * 也就是这一次的调用其实并没有执行唤醒后继结点的操作。就类似于我只需要一张优惠券, * 但是我的两个朋友,他们分别给我了一张,因此我就剩余了一张。然后我就将这张剩余的优惠券 * 送(传播)给其他人使用,因此这里将节点置为可传播的状态(PROPAGATE) */ continue; // loop on failed CAS } } if (h == head) // loop if head changed break; } }
privatevoidsetHeadAndPropagate(Node node, long propagate){ // 备份一下头节点 Node h = head; // Record old head for check below /* * 移除头节点,并将当前节点置为头节点 * 当执行完这一步之后,其实队列的头节点已经发生改变, * 其他被唤醒的线程就有机会去获取锁,从而并发的执行该方法, * 所以上面备份头节点,以便下面的代码可以正确运行 */ setHead(node);
/* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */
/* * 判断是否需要唤醒后继结点,propagate > 0 说明共享资源有剩余, * h.waitStatus < 0,表明当前节点状态可能为 SIGNAL,CONDITION,PROPAGATE */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 只有 s 不处于独占模式时,才去唤醒后继结点 if (s == null || s.isShared()) doReleaseShared(); } }
到了这里,脉络就比较清晰了,当一个节点获取到共享锁之后,它除了将自身设为 head 节点之外,还会判断一下是否满足唤醒后继结点的条件,如果满足,就唤醒后继结点,后继结点获取到锁之后,会重复这个过程,直到判断条件不成立。就类似于考试时从第一排往最后一排传卷子,第一排先留下一份,然后将剩余的传给后一排,后一排会重复这个过程。如果传到某一排卷子没了,那么位于这排的人就要等待,直到老师又给了他新的卷子。
privatevoidcancelAcquire(Node node){ // Ignore if node doesn't exist if (node == null) return;
node.thread = null;
// 跳过前面的已经取消的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
// 保存下 pred 的后继结点,以便 CAS 操作使用 // 因为可能存在已经取消的节点,所以 pred.next 不一等于 node Node predNext = pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 将节点状态设为 CANCELED node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); }