【原创】Java并发编程系列29 | ConcurrentLinkedQueue
J.U.C 为常用的集合提供了并发安全的版本,前面讲解了 map 的并发安全集合 ConcurrentHashMap,List 并发安全集合 CopyOnWriteArrayList,Set 并发安全集合 CopyOnWriteArraySet,本篇文章就来介绍并发安全的队列 ConcurrentLinkedQueue。
ConcurrentLinkedQueue 是一个基于链接节点的无边界的线程安全队列,采用非阻塞算法实现线程安全。分以下部分讲解:
- 类结构
- offer()
- poll()
- 如何保证并发安全性
- 总结
1. 类结构
队列由单向链表实现,ConcurrentLinkedQueue 持有头尾指针(head/tail 属性)来管理队列。
队列进行出队入队时对节点的操作都是通过 CAS 实现,保证线程安全。
public class ConcurrentLinkedQueue<E> {
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
/**
* 节点类
* CAS保证节点操作安全
*/
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
// 获得item 和 next 的偏移量
UNSAFE.putObject(this, itemOffset, item);
}
/**
* 更改Node中的数据域item
*/
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* 更改Node中的指针域next
*/
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
/**
* 更改Node中的指针域next
*/
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
}
/**入队*/
public boolean offer(E e)
/**出队*/
public E poll()
}
2. 入队 offer()
- 将入队节点设置成当前队列尾节点的下一个节点。
- 更新 tail 节点,tail 节点不总是尾节点。如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点;如果 tail 节点的 next 节点为空,则只入队不更新尾节点。
看下源码:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {// 注意这里是个循环
Node<E> q = p.next;
if (q == null) {//(1)
/*
* q == null 表示p是最后一个节点,设置t.next=newNode
* 如果失败,说明表示其他线程已经修改了p的指向,循环尝试直到成功
*/
if (p.casNext(null, newNode)) {// 2)
// node 加入节点后会导致tail距离最后一个节点相差大于一个,需要更新tail
if (p != t)//(3)
casTail(t, newNode);//(4) casTail:设置tail 尾节点
return true;
}
}
else if (p == q)//(5)
/*
* 多线程环境下,offer(e)和poll()同时执行,
* 此时p节点被poll(),设置了p.next=p,所以此时q=p.next=p
* 这里要重新设置p结点
*/
p = (t != (t = tail)) ? t : head;//(6)
else
// tail并没有指向尾节点,重新设置p
p = (p != t && t != (t = tail)) ? t : q;//(7)
}
}
单看代码很难理解 offer()过程,不用担心,我们一起从头复现一下 offer()过程:
- 初始时,head、tail 都指向同一个 item 为 null 的节点
- offer(A):执行步骤(1) (2),将 A 加入队列,但不更新 tail。
- offer(B):第一次循环执行(7),设置 p=A 结点。第二次循环执行(1)(2)(3),插入 B 并将 taill 更新为 B。
- offer(C):执行步骤(1) (2),将 C 加入队列,但不更新 tail。
- 循环....
3. 出队 poll()
- 从队列里返回一个节点元素,并清空该节点对元素的引用
- 更新 head,并不是每次出队时都更新 head 节点。当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点;只有当 head 节点里没有元素时,出队操作才会更新 head 节点。
public E poll() {
restartFromHead:// 如果出现p被删除的情况需要从head重新开始
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// item不为null,将item返回并设置为null
if (item != null && p.casItem(item, null)) {//(1)
if (p != h)//(2)
/*
* p != head 则更新head
* p.next != null,则将head更新为p.next ,否则更新为p
*/
updateHead(h, ((q = p.next) != null) ? q : p);//(3)
return item;
}
// p.next == null 队列为空
else if ((q = p.next) == null)//(4)
updateHead(h, p);
return null;
}
// 另一个线程已经把p从队列中删除并设置p.next = p,p已经被移除不能继续,需要重新开始
else if (p == q)//(5)
continue restartFromHead;
else
p = q;//(6)
}
}
}
/**
* 更新head
*/
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
同样复现一下 poll()过程,接着上文 offer()之后的数据:
- 第一次 poll():第一次循环,执行(6)设置 p=A;第二次循环执行(1)(2)(3),将 A 返回,设置 A.item=null,更新 head=原 A 节点。
- 第二次 poll():第一循环,执行(6)设置 p=B;第二次循环执行(1)(2)(3),将 A 返回,设置 B.item=null,更新 head=原 B 节点。
- 第三次 poll():第一循环,执行(6)设置 p=C;第二次循环执行(1)(2)(3),将 A 返回,设置 B.item=null,更新 head=原 C 节点。
4. 总结
4.1 如何保证并发安全性
需要保证线程安全的三种情况:
- 多个线程同时 offer():
多个线程同时执行到 casNext()设置最后的节点,casNext()通过 CAS 实现,第一个线程执行成功设置了最后一个节点后,其他线程的在 CAS 时发现期望的最后节点和实际上的最后节点不一致,CAS 就会失败,然后继续循环尝试直到成功。
- 多个线程同时 poll():
同样是通过 CAS 保证线程安全,多个线程同时执行到 casItem()设置当前节点 item=null,第一个线程执行成功设置了当前节点 item=null 后,其他线程的在 CAS 时发现期望的 item 与实际的 item 不一致,CAS 就会失败,然后继续循环尝试 poll 下一个节点直到成功。
- 队列中只有一个元素时,线程 Aoffer()一个线程 Bpoll():
线程 A 要设置 p.next=newNode,但是此时 poll()将 p 删除了。当 poll()将 p 删除时设置了 p.next=p,offer()方法中会检查这种情况,发现有 p.next=p 就重新设置一个合适的 p 节点,以便将 newNode 入队。
4.2 head/tail 为何延迟更新
tail 更新时机:tail 节点不总是尾节点。如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点;如果 tail 节点的 next 节点为空,则只入队不更新尾节点。head 更新时机:并不是每次出队时都更新 head 节点。当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点;只有当 head 节点里没有元素时,出队操作才会更新 head 节点。
head 和 tail 的更新总是间隔了一个。如果让 tail 永远作为队列的队尾节点,代码不是逻辑简单容易实现吗?
如果让 tail 永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,如果大量的入队操作,每次都要执行 CAS 进行 tail 的更新,汇总起来对性能也会是大大的损耗。如果能减少 CAS 更新的操作,无疑可以大大提升入队的操作效率,所以 doug lea 大师每间隔 1 次(tail 和队尾节点的距离为 1)进行才利用 CAS 更新 tail。 对 head 的更新也是同样的道理。
- 分页解决方案 之 QuickPager的使用方法(URL分页、自动获取数据)
- 分页解决方案 之 QuickPager的使用方法(PostBack分页、自定义获取数据)
- QuickPager asp.net 分页控件、表单控件等自定义控件下载 和介绍 【2009.09.07更新】
- 分页解决方案 之 QuickPager的使用方法(PostBack分页、自动获取数据)
- 【自然框架】之鼠标点功能现(二):表单控件的“应用”—— 代码?只写需要的!
- 基于Docker环境中源码部署容器Nginx
- 使用Ansible playbooks快速构建etcd集群
- 使用系统内置script和scriptreplay命令来记录操作记录
- 【机器学习】我在面试机器学习、大数据岗位时遇到的各种问题
- 【机器学习】机器学习编程语言之争狼烟再起,Python称霸?
- TiDB 1.1 Beta Release
- 【Python环境】Python面试题汇总(二)
- 【Python环境】Python性能优化的20条建议
- 【Python环境】Python面试题汇总(一)
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释