并发笔记之重入锁ReentrantLock分析

ReentrantLock可重入锁,是一种递归无阻塞的同步机制,可等同于synchronized但比synchronized更加强大、灵活。

ReentrantLock提供了公平锁和非公平锁,构造方法接受一个可选参数(默认为非公平锁),非公平锁往往会比公平锁效率更高

  • 公平锁:加锁前会检查同步队列中是否有等待线程,优先排队等待的线程,先来先执行
  • 非公平锁:加锁时不考虑排队问题,直接尝试获取锁,获取不到就自动追加到同步队列尾部

非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}


final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

公平锁模式在获取同步状态前先通过调用hasQueuedPredecessors()方法判断同步队列中是否存在等待执行的线程,如果有则优先执行

1
2
3
4
5
6
7
8
9
10
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

Condition

Lock提供了条件Condition,对线程的等待、唤醒操作更加灵活、详细。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public interface Condition {

/**
* 造成当前线程在接到信号或中断前一直处于等待状态
*/
void await() throws InterruptedException;

/**
* 造成当前线程在接到信号前一直处于等待状态
*/
void awaitUninterruptibly();

/**
* 造成当前线程等待直到收到信号、被中断或到达指定等待时间
* 返回值表示剩余时间,若nanosTimeout前唤醒则返回值=nanosTimeout-消耗时间
* 若返回值<=0则可以认定它已经超时
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;

/**
* 造成当前线程等待直到收到信号、被中断或到达指定等待时间
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;

/**
* 造成当前线程等待直到它收到信号、被中断或等待超时
* 若未到指定时间返回true 否则返回false
*/
boolean awaitUntil(Date deadline) throws InterruptedException;

/**
* 唤醒一个等待线程
*/
void signal();

/**
* 唤醒所有等待线程
*/
void signalAll();
}

Condition的实现

获取一个Condition必须要通过Lock的newCondition()方法。该方法定义在接口Lock中,返回值是绑定此Lock实例的新的Condition实例。Condition接口只有一个实现类
ConditionObject。

1
2
3
4
5
6
7
8
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 条件队列第一个节点 */
private transient Node firstWaiter;
/** 条件队列最后一个节点*/
private transient Node lastWaiter;
... ...
}

await等待

当调用await()方法,当前线程会进入等待状态,并将把当前线程构造成一个节点(Node)并加入到条件队列的尾部,同时释放锁 如下图:

Condition的条件队列相对CLH同步队列较简单,只需要将原尾节点的nextWaiter指向新节点 然后更新lastWaiter即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public final void await() throws InterruptedException {
if (Thread.interrupted())//当前线程被中断
throw new InterruptedException();
Node node = addConditionWaiter();//将当前线程加入到Condition等待队列
int savedState = fullyRelease(node);//释放锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) { //如果节点不在同步队列里 则挂起当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//如果当前线程被中断 则退出
break;
}
//尝试获得锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // 清理取消的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//将当前线程构造成Node 加入到Condition的等待队列中
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
//清理非CONDITION状态的节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
//释放当前锁 如果失败则将当前节点状态更新为取消CANCELLED
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
//检查节点是否在同步队列上
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}

signal通知

调用Condition的signal()方法,将唤醒在Condition等待队列等待最长时间的节点(即条件队列的首节点),在唤醒节点前会将节点移到CLH同步队列中。

1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively())//当前节点是否获得了锁
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

signal()首先判断当前线程是否获得了锁,然后唤醒条件队列的首节点
1
2
3
4
5
6
7
8
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

doSignal()主要做了两件事:

  1. 修改条件队列的首节点
  2. 调用transferForSignal()将首节点移到CLH同步队列
1
2
3
4
5
6
7
8
9
10
11
12
13
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);//将当前节点加入到CLH同步队列 成功则返回当前节点的前继节点
int ws = p.waitStatus;
//如果当前节点的前继节点取消 或更新waitStatus失败则直接唤醒当前线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

总结

一个线程获取到锁之后,通过调用Condition的await()方法,会将当前线程加入到条件队列,然后释放锁 之后通过isOnSyncQueue(Node node)方法不断自查节点是否已经在CLH同步队列中。如果是则尝试获取锁 否则一直挂起。
当线程调用signal()方法,程序首先检查当前线程是否获得了锁,然后通过doSignal()方法唤醒CLH同步队列的首节点。被唤醒的线程,将从await()方法中的while循环中退出来,然后调用acquireQueued()方法竞争同步状态。

坚持原创技术分享,您的支持将鼓励我继续创作!