并发笔记之AQS原理分析

AQS、AbstractQueuedSynchronizer,即队列同步器。他是构建其他同步组件的基础框架,如ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch等。

概述

AQS使用一个int类型的成员变量state来表示同步状态,当state>0表示获得了锁,当state=0表示释放了锁。内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取
同步状态(锁)失败时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入到同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其
再次尝试获取同步状态。
AQS提供了如下一些主要方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//返回同步状态
protected final int getState()
//设置当前同步状态
protected final void setState(int newState)
//使用CAS设置当前状态 该方法能够保证状态设置的原子性
protected final boolean compareAndSetState(int expect, int update)
//独占式获取同步状态 获取同步状态成功后 其他线程需要等待改线程释放同步状态才能获取同步状态
protected boolean tryAcquire(int arg)
//独占式释放同步状态
protected boolean tryRelease(int arg)
//共享式获取同步状态 返回值大于等于0表示获取成功 否则获取失败
protected int tryAcquireShared(int arg)
//共享式释放同步状态
protected boolean tryReleaseShared(int arg)
//当前同步器是否在独占式模式下被线程占用 一般该方法表示是否被当前线程所独占
protected boolean isHeldExclusively()
//独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法
public final void acquire(int arg)
//与acquire相同,但该方法响应中断,当前线程为获取到同步状态而进入到同步队列,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回
public final void acquireInterruptibly(int arg) throws InterruptedException
//超时获取同步状态,如果当前线程再nanos时间内没有获取到同步状态,那么返回false 已经获取则返回true
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException
//共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态
public final void acquireShared(int arg)
//共享式获取同步状态,响应中断
public final void acquireSharedInterruptibly(int arg) throws InterruptedException
//共享式获取同步状态,增加超时限制
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException
//共享式释放同步状态
public final boolean releaseShared(int arg)
//独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
public final boolean release(int arg)

CLH同步队列

上面提到AQS内部维护了一个FIFO队列,该队列就是CLH同步队列。CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程
以及等待状态等信息构造成一个节点(Node)并将其加入到CLH队列,同时会阻塞当前线程,当同步状态释放时会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(pred)、后继节点(next),其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
static final class Node {
//表示节点正在以共享模式等待
static final Node SHARED = new Node();
//表示节点正在以独占模式等待
static final Node EXCLUSIVE = null;
//表示节点被取消(超时或中断节点会被设置为取消状态,被取消的节点不会参与到竞争中,会一直保持取消状态不会转变为其他状态)
static final int CANCELLED = 1;
//表示后继节点的线程需要被唤醒(后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行)
static final int SIGNAL = -1;
//表示当前节点在条件队列中,节点线程等待在Condition上(当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入同步状态的获取中)
static final int CONDITION = -2;
/**
* 表示下一次共享模式同步状态获取将会无条件地传播下去
*/
static final int PROPAGATE = -3;

/**
* 对于普通同步节点waitStatus默认为0,条件等待节点为CONDITION,waitStatus通过CAS进行更新
*/
volatile int waitStatus;

/**
* 前驱节点
*/
volatile Node prev;

/**
* 后继节点
*/
volatile Node next;

/**
* 获取同步状态的线程 通过构造器初始化 使用完毕
*/
volatile Thread thread;

/**
* 指向条件队里的下一个节点
*/
Node nextWaiter;

/**
*判断是否为共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* 获取前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

其中SHAREEXCLUSIVE常量分别代表共享模式和独占模式
共享模式:允许多个获取同步状态,如Semaphore
独占模式:同一时间只能有一个线程获取同步状态,多余的线程则需要排队等待 如ReentrantLock
变量waitStatus表示当前构造的Node节点的等待状态,共有4中:SIGNAL、CANCELLED、CONDITION、PROPAGATE:

  • CANCELLED:值为1,在同步队列等待的线程等待超时或被中断,需要从同步队列中取消该Node的节点,其节点的waitStatus为CANCELLED,即结束状态,进入该状态后的节点将不会再变化
  • SIGNAL:值为-1,被标识为该等待唤醒状态的后继节点,当其前继节点的线程释放了同步锁或被取消,将会通知该后继节点的线程执行。(即:处于唤醒状态,只要前继节点释放锁,就会通知标识为signal状态的后继节点线程执行)
  • CONDITION:值为-2,与Condition相关,该标识的节点处于条件队列中,节点的线程等待在Condition上,其他线程调用了Condition的signal()方法后,CONDITION状态节点从条件队列转移到同步队列中,等待获取同步锁
  • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该标识节点的线程处于可运行状态。
  • 0状态:值为0,代表初识化状态

入队

CLH入队操作即tail指向新节点、新节点的prev指向当前最后节点,当前最后节点next指向新节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private Node addWaiter(Node mode) {
//构造节点
Node node = new Node(Thread.currentThread(), mode);
// 快速尝试添加尾结点 入队
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//若快速设置尾结点失败 则执行enq操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

出队

CLH同步队列遵循FIFO,首节点的线程释放同步状态后,将会唤醒它的后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点
(head执行该节点并断开原首节点的next和当前节点的prev即可,此过程不需要CAS来保证,因为只有一个线程能够成功获取到同步状态)

同步状态的获取与释放

AQS的设计模式采用了模板方法模式,子类通过继承并重写特定方法来管理同步状态。其具有两种模式:独占模式和共享模式

独占模式

独占模式同一时刻只有一个线程持有同步状态

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  • tryAcquire(arg)尝试获取同步状态 获取成功返回true否则返回false 此方法由自定义同步组件实现,必须保证线程安全获取同步状态
  • addWaiter(Node.EXCLUSIVE)tryAcquire获取失败,则调用此方法将当前线程放入CLH同步队列尾部
  • acquireQueued() 当前线程根据公平性原则来进行阻塞等待(自旋),直到获取锁为止 并且返回当前线程在等待过程中有没有中断过
  • selfInterrupt 产生一个中断

即每个线程如果获取同步状态失败,加入到CLH同步队列后就会进入acquireQueued()自旋,当条件满足获取同步状态成功从自旋中退出,否则一直执行下去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//中断标志
boolean interrupted = false;
for (;;) {//自旋过程
final Node p = node.predecessor();
//如果当前节点的前继节点是头结点并且获取同步状态成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {//前驱节点状态为CANCELLED被中断或被取消,需要从同步队列中移除
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {//前驱节点状态为CONDITION、PROPAGATE
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

独占模式获取响应中断

AQS提供了acquireInterruptibly(int arg)方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

acquire(int arg)方法区别:

  1. 方法声明抛出了InterruptedException异常
  2. 在中断方法处不再使用Interrupted标识,而是直接抛出InterruptedException异常

独占模式超时获取

AQS还提供了一个增强版本的方法tryAcquireNanos(int arg,long nanos)。该方法是acquireInterruptibly方法的进一步增强,它除了响应中断外,还有超时控制。
即如果线程没有在指定时间内获取同步状态,则会返回false 否则返回true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)//已经超时返回false
return false;
//若未超时则等待nanosTimeout纳秒
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())//线程是否已经中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

独占模式同步状态释放

当线程后期同步状态后,执行完响应逻辑就需要释放同步状态。AQS提供了release(int arg)方法释放同步状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

首先调用tryRelease(int arg)方法,若调用成功则通过unparkSuccessor(Node)唤醒后继节点

小结

独占模式下AQS维护这一个FIFO同步队列,线程获取同步状态失败时进入同步队列等待,同时进入自旋 不断检查前继节点是否是头结点,如果是,则尝试获取同步状态。
若获取成功则退出同步队列,当线程执行完逻辑后,释放会唤醒其后继节点。

共享模式

共享模式和独占模式最大的区别是:同一时刻独占模式只能有一个线程获取同步状态,而共享模式在同一时刻可以有多个线程获取同步状态。

共享模式同步状态获取

AQS提供了acquireShared(int arg)方法共享获取同步状态:

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

方法通过调用tryAcquireShared(int arg)尝试获取同步状态,若获取失败则调用doAcquireShared(int arg)自旋方式获取同步状态,共享模式获取同步状态的标识是
返回>=0 的值表示获取成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doAcquireShared(int arg) {
//构建共享模式的节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

如果节点的前继节点是头结点则通过tryAcquireShared(int arg)尝试获取同步状态,获取成功则退出自旋
acquireShared(int arg)不响应中断,与独占模式类似,也提供了响应中断、超时的方法:acquireSharedInterruptibly(int arg)tryAcquireSharedNanos(int arg,long nanos)

共享模式同步状态释放

获取同步状态后,共享模式通过调用releaseShared(int arg)释放同步状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

因为共享模式可能会存在多个线程同时进行释放同步状态资源,需要确保同步状态安全地成功释放,故利用CAS和自旋来保证。

坚持原创技术分享,您的支持将鼓励我继续创作!