SynchronousQueue

SynchronousQueue实现了BlockingQueue接口的阻塞队列,比较常见的使用场景就是Executors.newCachedThreadPool(),下面有个简单的例子来看看其具体使用吧:

举个栗子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
new Thread(() -> {
IntStream.range(1,10).forEach(val->{
try {
System.out.println("thread1 put number :" + val);
queue.put(val);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

}).start();

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

new Thread(() -> {
try {
while (true)
System.out.println("thread2 take number:" + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
thread1 put number :1
thread2 take number:1
thread1 put number :2
thread2 take number:2
thread1 put number :3
thread2 take number:3
thread1 put number :4
thread2 take number:4
thread1 put number :5
thread2 take number:5
thread1 put number :6
thread2 take number:6
thread1 put number :7
thread1 put number :8
thread2 take number:7
thread2 take number:8
thread1 put number :9
thread2 take number:9

可以看到puttake操作是成对出现,实际SynchronousQueue队列中的put&take/offer&poll操作都是互斥等待关系,相当于生产者和消费者,生产者生产出数据无消费者消费则阻塞等待 反之亦然。
通过阅读其源码可以看出,SynchronousQueue实际是通过CAS来完成无锁的并发生产、消费。

1
2
3
4
public SynchronousQueue(boolean fair) {
//fair为true表示公平模式 通过Dual Queue来实现 false表示非公平模式 通过Dual Stack来实现
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

双重数据结构

这里有一个新的概念,dual queuedual stack称之为dual data structure 译为双重数据结构。
什么是双重数据结构?
放取数据使用同一个数据结构,其中节点具有两种模式:

  1. 数据节点
  2. 非数据节点
    以双重队列为例:
    放/取元素时先跟队列尾部节点对比,若尾结点跟当前节点是同一模式 则将节点放入尾部入队 如果是互补模式 则将元素匹配并出队列

Transferer

TransferQueueTransferStack继承了TransfererTransfererSynchronousQueue的内部类,它提供了一个transfer()方法,该方法定义了转移数据的规范:

1
2
3
4
5
6
7
8
9
10
11
12
13
abstract static class Transferer<E> {
/**
* 执行put或take操作
*
* @param e 不为空时表示交给消费者消费, the item to be handed to a consumer;
* 为空时表示请求返回一个生产者提供的数据
* @param timed 是否超时
* @param nanos 超时纳秒数
* @return 如果非空表示数据被提供或收到数据
* 如果未空 可能由于超时或线程中断导致操作失败 可以通过Thread.interrupted判断具体是哪个原因
*/
abstract E transfer(E e, boolean timed, long nanos);
}

公平模式

存储数据结构

公平模式是使用双端队列来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/** Dual Queue */
static final class TransferQueue<E> extends Transferer<E> {

/** Node class for TransferQueue. */
static final class QNode {
volatile QNode next; // 队列中下一个节点
volatile Object item; // 节点中保存的数据
volatile Thread waiter; // 等待在该节点上的线程
final boolean isData;//判断当前节点是生产者产生还是消费者产生

QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}

boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

/**
* Tries to cancel by CAS'ing ref to this as item.
*/
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
/**
* 节点被取消执行tryCancel操作 会将节点的item值指向本身
*/
boolean isCancelled() {
return item == this;
}

/**
* 判断当前节点是否被丢弃 advanceHead操作会将next指向自身
*/
boolean isOffList() {
return next == this;
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;

static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/
transient volatile QNode cleanMe;

TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}

/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}

/**
* Tries to cas nt as new tail.
*/
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}

/**
* Tries to CAS cleanMe slot.
*/
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}

可以看出TransferQueue是通过QNode封装线程作为节点的普通的双端队列,QNode的next指向队列中的下一个节点,TransferQueue分别有一个指向队列头部(head)和尾部(tail)的指针。
TransferQueue有三个重要的方法,transferawaitFulfillclean,其中transfer是put/take/offer/poll方法的实际实现。

transfer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}

E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);

for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // 并发操作 未完成初始化 则自旋
continue; // spin

if (h == t || t.isData == isData) { // 队列为空 或 E和队尾元素具有相同模式
QNode tn = t.next;
if (t != tail) // inconsistent read 非一致读 状态不一致表示其他线程修改了tail 自旋
continue;
if (tn != null) { // lagging tail 如果队尾后扔挂着元素表明其他线程添加了节点 将tail进行cas操作更新 尝试将tn设置为tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // 调用超时 直接返回null
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // put的元素先加在tail的next指针上 失败则自旋
continue;

advanceTail(t, s); // swing tail and wait 通过CAS更新tail
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // x等于s 表明节点中断或超时 否则s==null或是匹配的节点
clean(t, s);//清理节点s
return null;
}

if (!s.isOffList()) { // 判断节点是否已经从队列中离开
advanceHead(t, s); // 尝试将s节点设置为head 移出t
if (x != null) // and forget fields
s.item = s;
s.waiter = null;//释放线程ref
}
return (x != null) ? (E)x : e;

} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)//其他线程修改了结构 不一致读 自旋重新开始
continue; // inconsistent read

/**
* 生产者和消费者匹配操作
*/
Object x = m.item;
if (isData == (x != null) || // 判断isData与x的模式是否相同 相同表示匹配了
x == m || // m节点被取消了
!m.casItem(x, e)) { // 尝试将数据e设置到m上失败
advanceHead(h, m); // 将m设置为头节点 h出队列 然后重试
continue;
}

advanceHead(h, m); // 成功匹配 m设置为头节点 h出队列
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}

transfer方法主要完成两个工作:

  1. 当队列为空或者加入节点和尾结点具有相同模式,则将节点追加到尾部 等待匹配成功返回值或者被取消返回null
  2. 当前节点与头节点是互补模式,则通过CAS尝试完成匹配 唤醒节点并移除队列 返回匹配的值
awaitFulfill
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 自旋或阻塞直到节点s被填充
* 如果s为head.next节点,则进行自旋
* 否则直接block 直到有其他线程与之匹配或其自己进行线程中断
*/
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//计算自旋次数 前提s位于头结点的下一个节点 如果设置了等待时间 自旋次数为32 如果未设置时间 自旋次数为512
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())//线程被中断 则取消节点:将item字段指向自己
s.tryCancel(e);
Object x = s.item;
if (x != e)//跳出自旋 1、线程被中断 2、等待时间到了被取消 3、线程拿走数据并修改了item的值
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {//超过等待时间 取消节点(通过将自身item指向自己 if(x!=e)肯定成立)
s.tryCancel(e);
continue;
}
}
if (spins > 0)//自旋次数>0 自减一
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)//没有超时时间的park
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)//自旋次数耗尽 超时时间的park
LockSupport.parkNanos(this, nanos);
}
}
clean
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* 清理掉被取消的节点
*/
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this,
* if we cannot delete s, we save its predecessor as
* "cleanMe", deleting the previously saved version
* first. At least one of node s or the node previously
* saved can always be deleted, so this always terminates.
*/
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
//如果hn不会空并且被取消 向前推进
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
//队列为空
if (t == h)
return;
QNode tn = t.next;
//读不一致 重新开始
if (t != tail)
continue;
//tn不为空表示其他线程新增了节点 cas更新tail 重新开始
if (tn != null) {
advanceTail(t, tn);
continue;
}
//s不是尾节点 移出队列
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
//如果s已经移出队列退出循环 否则尝试断开
if (sn == s || pred.casNext(s, sn))
return;
}
/**
* s为尾节点 则有可能其他线程在添加新节点 则cleanMe登场
**/
QNode dp = cleanMe;
//如果dp不为null 说明是前一个被取消的节点 将其移除
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;
QNode dn;
if (d == null || // 节点d已经删除
d == dp || // 原来的节点cleanMe已经通过advanceHead进行删除
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
//清除cleanMe节点 如果dp == pred成立,则说明节点s清除成功直接return 否则要再次循环
casCleanMe(dp, null);
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred))//先将要删除的尾部节点标记为cleanMe 延迟等待下次删除
return; // Postpone cleaning s
}
}

clean方法的思路分两种情况:

  1. 若被取消的节点不是队列尾部节点 则直接通过修改其前驱节点的next指针指向其后继节点,将被取消的节点删除
  2. 若被取消的节点是队列的尾部节点 则用cleanMe指针指向其前驱节点,等待以后再删除(只要为了防止我们在删除尾部节点的同时有其他节点被追加到尾部节点)

    当cleanMe==null时将前继节点pred设置为cleanMe为下次删除做准备
    当cleanMe!=null时先删除上次需要删除的cleanMe.next然后将cleanMe置为null 然后再讲pred赋值给cleanMe

clean方法源码因为并发的情况比较多 理解起来有些难度 可以参考戳我~

非公平模式

存储数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/** Dual stack */
static final class TransferStack<E> extends Transferer<E> {
/*
* This extends Scherer-Scott dual stack algorithm, differing,
* among other ways, by using "covering" nodes rather than
* bit-marked pointers: Fulfilling operations push on marker
* nodes (with FULFILLING bit set in mode) to reserve a spot
* to match a waiting node.
*/

/* Modes for SNodes, ORed together in node fields */
/** Node represents an unfulfilled consumer */
static final int REQUEST = 0;//消费者请求数据
/** Node represents an unfulfilled producer */
static final int DATA = 1;//生产者生产数据
/** Node is fulfilling another unfulfilled DATA or REQUEST */
static final int FULFILLING = 2;//正在匹配中

/** Returns true if m has fulfilling bit set. */
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

/** Node class for TransferStacks. */
static final class SNode {
volatile SNode next; // 指向栈中下一个节点
volatile SNode match; // the node matched to this
volatile Thread waiter; // 等待在节点上的线程
Object item; // 节点上存储的数据 data; or null for REQUESTs
int mode;
// Note: item and mode fields don't need to be volatile
// since they are always written before, and read after,
// other volatile/atomic operations.

SNode(Object item) {
this.item = item;
}

boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

/**
* Tries to match node s to this node, if so, waking up thread.
* Fulfillers call tryMatch to identify their waiters.
* Waiters block until they have been matched.
*
* @param s the node to match
* @return true if successfully matched to s
*/
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}

/**
* Tries to cancel a wait by matching node to itself.
*/
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}

boolean isCancelled() {
return match == this;
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;

static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

/** The head (top) of the stack */
volatile SNode head;//指向栈顶 并且没有初始化为dummy节点

boolean casHead(SNode h, SNode nh) {
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}

/**
* Creates or resets fields of a node. Called only from transfer
* where the node to push on stack is lazily created and
* reused when possible to help reduce intervals between reads
* and CASes of head and to avoid surges of garbage when CASes
* to push nodes fail due to contention.
*/
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}

TransferStack是一个栈结构,使用SNode来封装栈节点,包含一个用于指向栈顶的指针head,其内部与TransferQueue基本相同,不同在于:TransferStack在匹配到节点时不是将节点立即出栈而是将匹配节点入栈 然后同时
将匹配上的两个节点一起出栈

tranfer

tranfer的基本算法为:

  1. 若栈为空或head节点与当前节点模式相同(均为REQUEST或DATA)则尝试将节点加入栈并等待匹配,匹配成功返回相应的值 若被取消则返回null
  2. 若栈中包含互补模式的节点,则尝试入栈一个包含FULFILLING的节点 并且匹配相应的处于等待中的栈中节点,匹配成功将成功匹配的两个节点都出栈并返回相应的值
  3. 若栈顶为一个模式包含FULFILLING的节点,则帮助其执行匹配和出栈操作,然后再循环执行自己的匹配操作。帮助其他线程操作和自身执行匹配操作代码基本一致,除了不返回匹配的值。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    E transfer(E e, boolean timed, long nanos) {

    SNode s = null; // constructed/reused as needed
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
    SNode h = head;
    //如果栈为空或者当前节点模式和head结点模式相同 将节点压入栈内 等待匹配
    if (h == null || h.mode == mode) { // empty or same-mode
    //超时
    if (timed && nanos <= 0) { // can't wait
    if (h != null && h.isCancelled())//节点被取消
    casHead(h, h.next); // 清除被取消的节点
    else
    return null;
    //非超时 实例化一个SNode节点 并通过CAS尝试替换头结点head(head->s->h)
    } else if (casHead(h, s = snode(s, e, h, mode))) {
    //自旋等待线程匹配
    SNode m = awaitFulfill(s, timed, nanos);
    if (m == s) { // wait was cancelled
    clean(s);
    return null;
    }
    /**
    * 运行到这里说明得到匹配被唤醒 从栈顶将匹配的两个节点一起出栈
    */
    if ((h = head) != null && h.next == s)
    casHead(h, s.next); // help s's fulfiller
    return (E) ((mode == REQUEST) ? m.item : s.item);
    }
    } else if (!isFulfilling(h.mode)) { // try to fulfill 模式不同 尝试匹配
    if (h.isCancelled()) // already cancelled
    casHead(h, h.next); // pop and retry
    //将节点入栈,且模式标记为正在匹配中
    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
    for (;;) { // loop until matched or waiters disappear
    SNode m = s.next; // m is s's match
    if (m == null) { // all waiters are gone 说明栈中s之后无元素了 重新进入最外层循环
    casHead(s, null); // pop fulfill node
    s = null; // use new node next time
    break; // restart main loop
    }
    /**
    * 将s设置为m的匹配节点 并更新栈顶为m.next 即将s和m同时出栈
    */
    SNode mn = m.next;
    if (m.tryMatch(s)) {
    casHead(s, mn); // pop both s and m
    return (E) ((mode == REQUEST) ? m.item : s.item);
    } else // lost match
    s.casNext(m, mn); // help unlink
    }
    }
    /**
    * 如果其他线程正在匹配 则帮助其匹配
    */
    } else { // help a fulfiller
    SNode m = h.next; // m is h's match
    if (m == null) // waiter is gone
    casHead(h, null); // pop fulfilling node
    else {
    SNode mn = m.next;
    if (m.tryMatch(h)) // help match
    casHead(h, mn); // pop both h and m
    else // lost match
    h.casNext(m, mn); // help unlink
    }
    }
    }
    }

    /**
    * Tries to match node s to this node, if so, waking up thread.
    * Fulfillers call tryMatch to identify their waiters.
    * Waiters block until they have been matched.
    *
    * @param s the node to match
    * @return true if successfully matched to s
    */
    boolean tryMatch(SNode s) {
    if (match == null &&
    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
    Thread w = waiter;
    if (w != null) { // waiters need at most one unpark
    waiter = null;
    LockSupport.unpark(w);
    }
    return true;
    }
    return match == s;
    }
clean

当节点被取消则将节点从栈中移除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Unlinks s from the stack.
*/
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread

/*
* At worst we may need to traverse entire stack to unlink
* s. If there are multiple concurrent calls to clean, we
* might not see s if another thread has already removed
* it. But we can stop when we see any node known to
* follow s. We use s.next unless it too is cancelled, in
* which case we try the node one past. We don't check any
* further because we don't want to doubly traverse just to
* find sentinel.
*/

SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;

// Absorb cancelled nodes at head
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);

// Unsplice embedded nodes
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue
SynchronousQueue 源码分析 (基于Java 8)
[java1.8源码笔记]SynchronousQueue详解

坚持原创技术分享,您的支持将鼓励我继续创作!