It has been 1237 days since the last update, the content of the article may be outdated.
前言
队列同步器 AbstractQueuedSynchronizer(以下简称 AQS),是用来构建锁或者其他同步组件的基础框架。它使用一个 int 成员变量来表示同步状态,通过 CAS 操作对同步状态进行修改,确保状态的改变是安全的。通过内置的 FIFO (First In First Out)队列来完成资源获取线程的排队工作。更多关于 Java 多线程的文章可以转到 这里
publicstaticvoidwithoutMutex()throws InterruptedException { System.out.println("Without mutex: "); int threadCount = 2; final Thread threads[] = new Thread[threadCount]; for (int i = 0; i < threads.length; i++) { finalint index = i; threads[i] = new Thread(new Runnable() { @Override publicvoidrun(){ for (int j = 0; j < 100000; j++) { if (j % 20000 == 0) { System.out.println("Thread-" + index + ": j =" + j); } } } }); }
for (int i = 0; i < threads.length; i++) { threads[i].start(); } for (int i = 0; i < threads.length; i++) { threads[i].join(); } }
publicstaticvoidwithMutex(){ System.out.println("With mutex: "); final Mutex mutex = new Mutex(); int threadCount = 2; final Thread threads[] = new Thread[threadCount]; for (int i = 0; i < threads.length; i++) { finalint index = i; threads[i] = new Thread(new Runnable() {
下面在看一个共享锁的示例。在该示例中,我们定义两个共享资源,即同一时间内允许两个线程同时执行。我们将同步变量的初始状态 state 设为 2,当一个线程获取了共享锁之后,将 state 减 1,线程释放了共享锁后,将 state 加 1。状态的合法范围是 0、1 和 2,其中 0 表示已经资源已经用光了,此时线程再要获得共享锁就需要进入同步序列等待。下面是具体实现:
publicSync(int resourceCount){ if (resourceCount <= 0) { thrownew IllegalArgumentException("resourceCount must be larger than zero."); } // 设置可以共享的资源总数 setState(resourceCount); }
@Override protectedinttryAcquireShared(int reduceCount){ // 使用尝试获得资源,如果成功修改了状态变量(获得了资源) // 或者资源的总量小于 0(没有资源了),则返回。 for (; ; ) { int lastCount = getState(); int newCount = lastCount - reduceCount; if (newCount < 0 || compareAndSetState(lastCount, newCount)) { return newCount; } } }
@Override protectedbooleantryReleaseShared(int returnCount){ // 释放共享资源,因为可能有多个线程同时执行,所以需要使用 CAS 操作来修改资源总数。 for (; ; ) { int lastCount = getState(); int newCount = lastCount + returnCount; if (compareAndSetState(lastCount, newCount)) { returntrue; } } } }
// 定义两个共享资源,说明同一时间内可以有两个线程同时运行 privatefinal Sync sync = new Sync(2);
在上面的流程中,其实涉及到了两个操作,比较以及替换,为了确保程序正确,需要确保这两个操作的原子性(也就是说确保这两个操作同时进行,中间不会有其他线程干扰)。现在的 CPU 中,提供了相关的底层 CAS 指令,即 CPU 底层指令确保了比较和交换两个操作作为一个原子操作进行(其实在这一点上还是有排他锁的. 只是比起用synchronized, 这里的排他时间要短的多.),Java 中的 CAS 函数是借助于底层的 CAS 指令来实现的。更多关于 CPU 底层实现的原理可以参考 这篇文章。我们来看下 Java 中对于 CAS 函数的定义:
/** * Atomically update Java variable to x if it is currently * holding expected. * @return true if successful */ publicfinalnativebooleancompareAndSwapObject(Object o, long offset, Object expected, Object x);
/** * Atomically update Java variable to x if it is currently * holding expected. * @return true if successful */ publicfinalnativebooleancompareAndSwapInt(Object o, long offset, int expected, int x);
/** * Atomically update Java variable to x if it is currently * holding expected. * @return true if successful */ publicfinalnativebooleancompareAndSwapLong(Object o, long offset, long expected, long x);
上面三个函数定义在 sun.misc.Unsafe 类中,使用该类可以进行一些底层的操作,例如直接操作原生内存,更多关于 Unsafe 类的文章可以参考 这篇。以 compareAndSwapInt 为例,我们看下如何使用 CAS 函数:
/** * Report the location of a given static field, in conjunction with {@link * #staticFieldBase}. * Do not expect to perform any sort of arithmetic on this offset; * it is just a cookie which is passed to the unsafe heap memory accessors. * * Any given field will always have the same offset, and no two distinct * fields of the same class will ever have the same offset. * * As of 1.4.1, offsets for fields are represented as long values, * although the Sun JVM does not use the most significant 32 bits. * It is hard to imagine a JVM technology which needs more than * a few bits to encode an offset within a non-array object, * However, for consistency with other methods in this class, * this method reports its result as a long value. */ publicnativelongobjectFieldOffset(Field f);
下面我们再看一下 compareAndSwapInt 的函数原型。我们知道 CAS 操作需要知道 3 个信息:内存中的值,期望的旧值以及要修改的新值。通过前面的分析,我们知道通过 o 和 offset 我们可以确定属性在内存中的地址,也就是知道了属性在内存中的值。expected 对应期望的旧址,而 x 就是要修改的新值。
java
1
publicfinalnativebooleancompareAndSwapInt(Object o, long offset, int expected, int x);
compareAndSwapInt 函数首先比较一下 expected 是否和内存中的值相同,如果不同证明其他线程修改了属性值,那么就不会执行更新操作,但是程序如果就此返回了,似乎不太符合我们的期望,我们是希望程序可以执行更新操作的,如果其他线程先进行了更新,那么就在更新后的值的基础上进行修改,所以我们一般使用循环配合 CAS 函数,使程序在更新操作完成之后再返回,如下所示:
java
1 2 3 4
long before = counter; while (!unsafe.compareAndSwapLong(this, offset, before, before + 1)) { before = counter; }
publicvoidincrement(){ long before = counter; while (!unsafe.compareAndSwapLong(this, offset, before, before + 1)) { before = counter; } }
publiclonggetCounter(){ return counter; }
privatestaticlong intCounter = 0;
publicstaticvoidmain(String[] args)throws InterruptedException { int threadCount = 10; Thread threads[] = new Thread[threadCount]; final CASCounter casCounter = new CASCounter();
for (int i = 0; i < threadCount; i++) { threads[i] = new Thread(new Runnable() { @Override publicvoidrun(){
for (int i = 0; i < 10000; i++) { casCounter.increment(); intCounter++; } } }); threads[i].start(); }
for(int i = 0; i < threadCount; i++) { threads[i].join(); } System.out.printf("CASCounter is %d \nintCounter is %d\n", casCounter.getCounter(), intCounter); } }
PROPAGATE: -3,在共享模式下,可以认为资源有多个,因此当前线程被唤醒之后,可能还有剩余的资源可以唤醒其他线程。该状态用来表明后续节点会传播唤醒的操作。需要注意的是只有头节点才可以设置为该状态(This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.)。
privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ // 当前节点的前继节点的等待状态 int ws = pred.waitStatus; // 如果前继节点的等待状态为 SIGNAL 我们就可以将当前节点对应的线程挂起 if (ws == Node.SIGNAL) returntrue; if (ws > 0) { // ws 大于 0,表明当前线程的前继节点处于 CANCELED 的状态, // 所以我们需要从当前节点开始往前查找,直到找到第一个不为 // CAECELED 状态的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
int ws = node.waitStatus; // 将 head 节点的状态置为 0,表明当前节点的后续节点已经被唤醒了, // 不需要再次唤醒,修改 ws 状态主要作用于 release 的判断 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
privatevoiddoReleaseShared(){ /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; // head = null 说明没有初始化,head = tail 说明同步队列中没有等待节点 if (h != null && h != tail) { // 查看当前节点的等待状态 int ws = h.waitStatus; // 我们在前面说过,SIGNAL说明有后续节点需要唤醒 if (ws == Node.SIGNAL) {
/* * 将当前节点的值设为 0,表明已经唤醒了后继节点 * 可能会有多个线程同时执行到这一步,所以使用 CAS 保证只有一个线程能修改成功, * 从而执行 unparkSuccessor,其他的线程会执行 continue 操作 */ if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } elseif (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) { /* * ws 等于 0,说明无需唤醒后继结点(后续节点已经被唤醒或者当前节点没有被阻塞的后继结点), * 也就是这一次的调用其实并没有执行唤醒后继结点的操作。就类似于我只需要一张优惠券, * 但是我的两个朋友,他们分别给我了一张,因此我就剩余了一张。然后我就将这张剩余的优惠券 * 送(传播)给其他人使用,因此这里将节点置为可传播的状态(PROPAGATE) */ continue; // loop on failed CAS } } if (h == head) // loop if head changed break; } }
privatevoidsetHeadAndPropagate(Node node, long propagate){ // 备份一下头节点 Node h = head; // Record old head for check below /* * 移除头节点,并将当前节点置为头节点 * 当执行完这一步之后,其实队列的头节点已经发生改变, * 其他被唤醒的线程就有机会去获取锁,从而并发的执行该方法, * 所以上面备份头节点,以便下面的代码可以正确运行 */ setHead(node);
/* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */
/* * 判断是否需要唤醒后继结点,propagate > 0 说明共享资源有剩余, * h.waitStatus < 0,表明当前节点状态可能为 SIGNAL,CONDITION,PROPAGATE */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 只有 s 不处于独占模式时,才去唤醒后继结点 if (s == null || s.isShared()) doReleaseShared(); } }
到了这里,脉络就比较清晰了,当一个节点获取到共享锁之后,它除了将自身设为 head 节点之外,还会判断一下是否满足唤醒后继结点的条件,如果满足,就唤醒后继结点,后继结点获取到锁之后,会重复这个过程,直到判断条件不成立。就类似于考试时从第一排往最后一排传卷子,第一排先留下一份,然后将剩余的传给后一排,后一排会重复这个过程。如果传到某一排卷子没了,那么位于这排的人就要等待,直到老师又给了他新的卷子。
privatevoidcancelAcquire(Node node){ // Ignore if node doesn't exist if (node == null) return;
node.thread = null;
// 跳过前面的已经取消的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
// 保存下 pred 的后继结点,以便 CAS 操作使用 // 因为可能存在已经取消的节点,所以 pred.next 不一等于 node Node predNext = pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 将节点状态设为 CANCELED node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); }