SynchronousQueue
实现了BlockingQueue
接口的阻塞队列,比较常见的使用场景就是Executors.newCachedThreadPool()
,下面有个简单的例子来看看其具体使用吧:
举个栗子
1 | SynchronousQueue<Integer> queue = new SynchronousQueue<>(); |
输出结果:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18thread1 put number :1
thread2 take number:1
thread1 put number :2
thread2 take number:2
thread1 put number :3
thread2 take number:3
thread1 put number :4
thread2 take number:4
thread1 put number :5
thread2 take number:5
thread1 put number :6
thread2 take number:6
thread1 put number :7
thread1 put number :8
thread2 take number:7
thread2 take number:8
thread1 put number :9
thread2 take number:9
可以看到put
和take
操作是成对出现,实际SynchronousQueue
队列中的put&take/offer&poll操作都是互斥等待关系,相当于生产者和消费者,生产者生产出数据无消费者消费则阻塞等待 反之亦然。
通过阅读其源码可以看出,SynchronousQueue
实际是通过CAS
来完成无锁的并发生产、消费。
1 | public SynchronousQueue(boolean fair) { |
双重数据结构
这里有一个新的概念,dual queue
和dual stack
称之为dual data structure
译为双重数据结构。
什么是双重数据结构?
放取数据使用同一个数据结构,其中节点具有两种模式:
- 数据节点
- 非数据节点
以双重队列为例:
放/取元素时先跟队列尾部节点对比,若尾结点跟当前节点是同一模式 则将节点放入尾部入队 如果是互补模式 则将元素匹配并出队列
Transferer
TransferQueue
和TransferStack
继承了Transferer
,Transferer
是SynchronousQueue
的内部类,它提供了一个transfer()
方法,该方法定义了转移数据的规范:1
2
3
4
5
6
7
8
9
10
11
12
13abstract static class Transferer<E> {
/**
* 执行put或take操作
*
* @param e 不为空时表示交给消费者消费, the item to be handed to a consumer;
* 为空时表示请求返回一个生产者提供的数据
* @param timed 是否超时
* @param nanos 超时纳秒数
* @return 如果非空表示数据被提供或收到数据
* 如果未空 可能由于超时或线程中断导致操作失败 可以通过Thread.interrupted判断具体是哪个原因
*/
abstract E transfer(E e, boolean timed, long nanos);
}
公平模式
存储数据结构
公平模式是使用双端队列来实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106/** Dual Queue */
static final class TransferQueue<E> extends Transferer<E> {
/** Node class for TransferQueue. */
static final class QNode {
volatile QNode next; // 队列中下一个节点
volatile Object item; // 节点中保存的数据
volatile Thread waiter; // 等待在该节点上的线程
final boolean isData;//判断当前节点是生产者产生还是消费者产生
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* Tries to cancel by CAS'ing ref to this as item.
*/
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
/**
* 节点被取消执行tryCancel操作 会将节点的item值指向本身
*/
boolean isCancelled() {
return item == this;
}
/**
* 判断当前节点是否被丢弃 advanceHead操作会将next指向自身
*/
boolean isOffList() {
return next == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/
transient volatile QNode cleanMe;
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
/**
* Tries to cas nt as new tail.
*/
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
/**
* Tries to CAS cleanMe slot.
*/
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
可以看出TransferQueue
是通过QNode
封装线程作为节点的普通的双端队列,QNode
的next指向队列中的下一个节点,TransferQueue
分别有一个指向队列头部(head)和尾部(tail)的指针。TransferQueue
有三个重要的方法,transfer
、awaitFulfill
和clean
,其中transfer
是put/take/offer/poll方法的实际实现。
transfer
1 | public void put(E e) throws InterruptedException { |
transfer
方法主要完成两个工作:
- 当队列为空或者加入节点和尾结点具有相同模式,则将节点追加到尾部 等待匹配成功返回值或者被取消返回null
- 当前节点与头节点是互补模式,则通过CAS尝试完成匹配 唤醒节点并移除队列 返回匹配的值
awaitFulfill
1 | /** |
clean
1 | /** |
clean
方法的思路分两种情况:
- 若被取消的节点不是队列尾部节点 则直接通过修改其前驱节点的next指针指向其后继节点,将被取消的节点删除
- 若被取消的节点是队列的尾部节点 则用cleanMe指针指向其前驱节点,等待以后再删除(只要为了防止我们在删除尾部节点的同时有其他节点被追加到尾部节点)
当cleanMe==null时将前继节点pred设置为cleanMe为下次删除做准备
当cleanMe!=null时先删除上次需要删除的cleanMe.next然后将cleanMe置为null 然后再讲pred赋值给cleanMe
clean
方法源码因为并发的情况比较多 理解起来有些难度 可以参考戳我~
非公平模式
存储数据结构
1 | /** Dual stack */ |
TransferStack
是一个栈结构,使用SNode
来封装栈节点,包含一个用于指向栈顶的指针head,其内部与TransferQueue
基本相同,不同在于:TransferStack
在匹配到节点时不是将节点立即出栈而是将匹配节点入栈 然后同时
将匹配上的两个节点一起出栈
tranfer
tranfer
的基本算法为:
- 若栈为空或head节点与当前节点模式相同(均为REQUEST或DATA)则尝试将节点加入栈并等待匹配,匹配成功返回相应的值 若被取消则返回null
- 若栈中包含互补模式的节点,则尝试入栈一个包含FULFILLING的节点 并且匹配相应的处于等待中的栈中节点,匹配成功将成功匹配的两个节点都出栈并返回相应的值
- 若栈顶为一个模式包含FULFILLING的节点,则帮助其执行匹配和出栈操作,然后再循环执行自己的匹配操作。帮助其他线程操作和自身执行匹配操作代码基本一致,除了不返回匹配的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
//如果栈为空或者当前节点模式和head结点模式相同 将节点压入栈内 等待匹配
if (h == null || h.mode == mode) { // empty or same-mode
//超时
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())//节点被取消
casHead(h, h.next); // 清除被取消的节点
else
return null;
//非超时 实例化一个SNode节点 并通过CAS尝试替换头结点head(head->s->h)
} else if (casHead(h, s = snode(s, e, h, mode))) {
//自旋等待线程匹配
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
/**
* 运行到这里说明得到匹配被唤醒 从栈顶将匹配的两个节点一起出栈
*/
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill 模式不同 尝试匹配
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
//将节点入栈,且模式标记为正在匹配中
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone 说明栈中s之后无元素了 重新进入最外层循环
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
/**
* 将s设置为m的匹配节点 并更新栈顶为m.next 即将s和m同时出栈
*/
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
/**
* 如果其他线程正在匹配 则帮助其匹配
*/
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
/**
* Tries to match node s to this node, if so, waking up thread.
* Fulfillers call tryMatch to identify their waiters.
* Waiters block until they have been matched.
*
* @param s the node to match
* @return true if successfully matched to s
*/
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
clean
当节点被取消则将节点从栈中移除1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36/**
* Unlinks s from the stack.
*/
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
/*
* At worst we may need to traverse entire stack to unlink
* s. If there are multiple concurrent calls to clean, we
* might not see s if another thread has already removed
* it. But we can stop when we see any node known to
* follow s. We use s.next unless it too is cancelled, in
* which case we try the node one past. We don't check any
* further because we don't want to doubly traverse just to
* find sentinel.
*/
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// Absorb cancelled nodes at head
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// Unsplice embedded nodes
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue
SynchronousQueue 源码分析 (基于Java 8)
[java1.8源码笔记]SynchronousQueue详解