AQS
、AbstractQueuedSynchronizer,即队列同步器。他是构建其他同步组件的基础框架,如ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch等。
概述
AQS使用一个int类型的成员变量state
来表示同步状态,当state>0表示获得了锁,当state=0表示释放了锁。内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取
同步状态(锁)失败时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入到同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其
再次尝试获取同步状态。
AQS提供了如下一些主要方法:
1 | //返回同步状态 |
CLH同步队列
上面提到AQS内部维护了一个FIFO队列,该队列就是CLH同步队列。CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程
以及等待状态等信息构造成一个节点(Node)并将其加入到CLH队列,同时会阻塞当前线程,当同步状态释放时会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(pred)、后继节点(next),其定义如下:
1 | static final class Node { |
其中SHARE
和EXCLUSIVE
常量分别代表共享模式和独占模式
共享模式:允许多个获取同步状态,如Semaphore
独占模式:同一时间只能有一个线程获取同步状态,多余的线程则需要排队等待 如ReentrantLock
变量waitStatus
表示当前构造的Node节点的等待状态,共有4中:SIGNAL、CANCELLED、CONDITION、PROPAGATE:
- CANCELLED:值为1,在同步队列等待的线程等待超时或被中断,需要从同步队列中取消该Node的节点,其节点的waitStatus为CANCELLED,即结束状态,进入该状态后的节点将不会再变化
- SIGNAL:值为-1,被标识为该等待唤醒状态的后继节点,当其前继节点的线程释放了同步锁或被取消,将会通知该后继节点的线程执行。(即:处于唤醒状态,只要前继节点释放锁,就会通知标识为signal状态的后继节点线程执行)
- CONDITION:值为-2,与Condition相关,该标识的节点处于条件队列中,节点的线程等待在Condition上,其他线程调用了Condition的signal()方法后,CONDITION状态节点从条件队列转移到同步队列中,等待获取同步锁
- PROPAGATE:值为-3,与共享模式相关,在共享模式中,该标识节点的线程处于可运行状态。
- 0状态:值为0,代表初识化状态
入队
CLH入队操作即tail指向新节点、新节点的prev指向当前最后节点,当前最后节点next指向新节点
1 | private Node addWaiter(Node mode) { |
出队
CLH同步队列遵循FIFO,首节点的线程释放同步状态后,将会唤醒它的后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点
(head执行该节点并断开原首节点的next和当前节点的prev即可,此过程不需要CAS来保证,因为只有一个线程能够成功获取到同步状态)
同步状态的获取与释放
AQS的设计模式采用了模板方法模式,子类通过继承并重写特定方法来管理同步状态。其具有两种模式:独占模式和共享模式
独占模式
独占模式同一时刻只有一个线程持有同步状态
1 | public final void acquire(int arg) { |
tryAcquire(arg)
尝试获取同步状态 获取成功返回true否则返回false 此方法由自定义同步组件实现,必须保证线程安全获取同步状态addWaiter(Node.EXCLUSIVE)
tryAcquire获取失败,则调用此方法将当前线程放入CLH同步队列尾部acquireQueued()
当前线程根据公平性原则来进行阻塞等待(自旋),直到获取锁为止 并且返回当前线程在等待过程中有没有中断过selfInterrupt
产生一个中断
即每个线程如果获取同步状态失败,加入到CLH同步队列后就会进入acquireQueued()
自旋,当条件满足获取同步状态成功从自旋中退出,否则一直执行下去。
1 | final boolean acquireQueued(final Node node, int arg) { |
独占模式获取响应中断
AQS提供了acquireInterruptibly(int arg)
方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException
1 | public final void acquireInterruptibly(int arg) |
与acquire(int arg)
方法区别:
- 方法声明抛出了InterruptedException异常
- 在中断方法处不再使用Interrupted标识,而是直接抛出InterruptedException异常
独占模式超时获取
AQS还提供了一个增强版本的方法tryAcquireNanos(int arg,long nanos)
。该方法是acquireInterruptibly
方法的进一步增强,它除了响应中断外,还有超时控制。
即如果线程没有在指定时间内获取同步状态,则会返回false 否则返回true
1 | public final boolean tryAcquireNanos(int arg, long nanosTimeout) |
独占模式同步状态释放
当线程后期同步状态后,执行完响应逻辑就需要释放同步状态。AQS提供了release(int arg)
方法释放同步状态1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
首先调用tryRelease(int arg)
方法,若调用成功则通过unparkSuccessor(Node)
唤醒后继节点
小结
独占模式下AQS维护这一个FIFO同步队列,线程获取同步状态失败时进入同步队列等待,同时进入自旋 不断检查前继节点是否是头结点,如果是,则尝试获取同步状态。
若获取成功则退出同步队列,当线程执行完逻辑后,释放会唤醒其后继节点。
共享模式
共享模式和独占模式最大的区别是:同一时刻独占模式只能有一个线程获取同步状态,而共享模式在同一时刻可以有多个线程获取同步状态。
共享模式同步状态获取
AQS提供了acquireShared(int arg)
方法共享获取同步状态:1
2
3
4public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
方法通过调用tryAcquireShared(int arg)尝试获取同步状态,若获取失败则调用doAcquireShared(int arg)自旋方式获取同步状态,共享模式获取同步状态的标识是
返回>=0 的值表示获取成功。
1 | private void doAcquireShared(int arg) { |
如果节点的前继节点是头结点则通过tryAcquireShared(int arg)
尝试获取同步状态,获取成功则退出自旋acquireShared(int arg)
不响应中断,与独占模式类似,也提供了响应中断、超时的方法:acquireSharedInterruptibly(int arg)
、tryAcquireSharedNanos(int arg,long nanos)
。
共享模式同步状态释放
获取同步状态后,共享模式通过调用releaseShared(int arg)
释放同步状态:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
因为共享模式可能会存在多个线程同时进行释放同步状态资源,需要确保同步状态安全地成功释放,故利用CAS和自旋来保证。