LinkedTransferQueue

LinkedTransferQueue是一个无界队列,是LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体,综合了三者的方法提供更加高效的实现方式。
其继承结构为:

TransferQueue

LinkedTransferQueue实现了TransferQueue接口,而TransferQueue接口继承自BlockingQueue,故LinkedTransferQueue也是一个阻塞队列。
LinkedTransferQueue定义如下方法:

1
2
3
4
5
6
7
8
9
//尝试移交元素
boolean tryTransfer(E e);
//移交元素
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//判断是否有消费者
boolean hasWaitingConsumer();
//查看消费者的数量
int getWaitingConsumerCount();

主要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -3223113410248163686L;
/** True if on multiprocessor */
private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
private static final int FRONT_SPINS = 1 << 7;
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
static final int SWEEP_THRESHOLD = 32;
static final class Node {
final boolean isData; // false if this is a request node
volatile Object item; // initially non-null if isData; CASed to match
volatile Node next;
volatile Thread waiter; // null until waiting
... ...
}
/** head of the queue; null until first enqueue */
transient volatile Node head;

/** tail of the queue; null until first append */
private transient volatile Node tail;

/** The number of apparent failures to unsplice removed nodes */
private transient volatile int sweepVotes;

/*
* 放取元素的几种方式
*/
//立即返回 用于非超时的poll()和tryTransfer()方法中
private static final int NOW = 0; // for untimed poll, tryTransfer
//异步 不会阻塞 用于放元素时 因内部使用无界单链表存储数据 不会阻塞放元素的过程
private static final int ASYNC = 1; // for offer, put, add
//同步 调用的时候若没有匹配 会阻塞直到匹配为止
private static final int SYNC = 2; // for transfer, take
//超时 用于有超时的poll()和tryTransfer()方法中
private static final int TIMED = 3; // for timed poll, tryTransfer

入队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}

出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}

public E poll() {
return xfer(null, false, NOW, 0);
}

移交元素的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}

xfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed

retry:
for (;;) { // restart on append race

for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
if (item != p && (item != null) == isData) { // unmatched
if (isData == haveData) // can't match
break;
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}

if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}

private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s; // initialize
}
else if (p.cannotPrecede(haveData))
return null; // lost race vs opposite mode
else if ((n = p.next) != null) // not last; keep traversing
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
else {
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed

for (;;) {
Object item = s.item;
if (item != e) { // matched
// assert item != s;
s.forgetContents(); // avoid garbage
return LinkedTransferQueue.<E>cast(item);
}
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
unsplice(pred, s);
return e;
}

if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
LockSupport.park(this);
}
}
}
坚持原创技术分享,您的支持将鼓励我继续创作!