ConcurrentLinkedQueue

ConcurrentLinkedQueue是基于链表的无界队列,队列中数据遵循FIFO,不允许使用null值。从ConcurrentLinkedQueue的类继承关系如下图:

可看出其实现了Queue接口但没有实现BlockingQueueConcurrentLinkedQueue不是阻塞队列,不能用于线程池中。

主要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private static final long serialVersionUID = 196745693267521676L;
//典型的单链表结构
private static class Node<E> {
volatile E item;
volatile Node<E> next;

/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
... ...
}
//队列中第一个存活的节点
private transient volatile Node<E> head;
//队列中最后一个节点
private transient volatile Node<E> tail;

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}

public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}

入队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public boolean add(E e) {
return offer(e);
}

public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {//说明已经是链表的尾部 直接入队 更新其next指向
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else//t并非tail 则更新p的值
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}

出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
//如果节点值不为空 则通过CAS更新为null
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {//
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}

总结

ConcurrentLinkedQueue使用CAS + spin更新头尾节点来完成出队、入队操作。

特性 ConcurrentLinkedQueue LinkedBlockingQueue
线程安全
取元素 队列为空获取元素直接返回null poll()方法可以实现此功能
无锁 通过CAS+Spin实现 使用重入锁
效率 较低
阻塞 非阻塞 阻塞
是否可在线程池使用
坚持原创技术分享,您的支持将鼓励我继续创作!