深入Java并发之阻塞队列-迭代器(一)
本篇来分析下阻塞队列里迭代器的实现,以ArrayBlockingQueue源码来分析。
首先在开始前想一想,如何实现阻塞队列的迭代器功能?
在并发下有些线程在读,有些在改,还有些在使用迭代器遍历,怎么确保安全性?用独占锁将这些操作隔离开,我们看 ArrayBlockingQueue 确实是这么做的。
既然安全性得到保障那么还有什么问题是需要考虑的 ?
过时数据问题。假设一个线程从此时的 takeIndex 位置开始使用迭代器往后遍历,每此调用 next 获取下一个数据的操作都需要提前获取锁,有可能很长时间都获取不到锁,这段有其它线程在读取本质上就是增加 takeIndex,那么当轮到这个迭代线程执行时他就应该首先检查下情况,是否自己接下来要读取的内容已经失效了(即位置下标落后于此时 takeIndex值)。还有删除数据时也要考虑到被删除位置对多有迭代器的影响。
并发下不只一个线程在使用迭代器,每个使用者都会创建 Itr
对象,它们构成一条链,Itrs
管理这条链。
迭代器 Itr
当你调用 iterator() 方法时,创建迭代器对象 Itr;
public Iterator<E> iterator() {
return new Itr();
}
1,重要的变量
先来介绍下迭代器 Itr 中非常重要的变量:
/** Index to look for new nextItem; NONE at end */
private int cursor;
/** Element to be returned by next call to next(); null if none */
private E nextItem;
/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
private int nextIndex;
注解很清楚的解释了三者的功能,每此 next() 调用返回的就是 nextItem 对象,而 nextIndex 指的是nextItem对象的下标位置,cursor 是 nextIndex 的下一个位置。在不断 next 获取元素过程中,这三者就是两个在前一个在后的这么往下移动着。
/** Last element returned; null if none or not detached. */
private E lastItem;
/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
private int lastRet;
lastItem 表示上一次返回的元素;lastRet 就是 lastItem 的下标。
我这里称 cursor,nextIndex,lastRet 为迭代过程的三剑客,当满足 cursor < 0 && nextIndex < 0 && lastRet < 0
时代表迭代器失效。
/** Previous value of takeIndex, or DETACHED when detached */
private int prevTakeIndex;
/** Previous value of iters.cycles */
private int prevCycles;
这两个变量与处理过时数据问题有关。
prevTakeIndex 代表本次遍历开始的位置,每此 next 都会进行修正;
prevCycles :Itrs 管理 Itr 链,它里面有个变量 cycles 记录 takeIndex 回到 0 位置的次数,迭代器的 prevCycles 存储该值。
迭代器操作需要获取独占锁,得不到就得等待,这就造成了其存储的 prevTakeIndex 与 prevCycles 可能过时,迭代器多处操作前都会通过这两个值来判断数据是否过时,以做相应的处理。
/** Special index value indicating "not available" or "undefined" */
private static final int NONE = -1;
/**
* Special index value indicating "removed elsewhere", that is,
* removed by some operation other than a call to this.remove().
*/
private static final int REMOVED = -2;
/** Special value for prevTakeIndex indicating "detached mode" */
private static final int DETACHED = -3;
DETACHED:专门用于preTakeIndex,isDetached方法通过其来判断迭代器状态
NONE:用于三个下标变量:cursor,nextIndex,lastRet;这三个下标变量用于迭代功能的实现。表明该位置数据不可用或未定义。
REMOVED:用于lastRet 与 nextIndex。表明数据过时或被删除。
接下来结合具体实现来看:
2,初始化
Itr 初始化过程:
private class Itr implements Iterator<E> {
Itr() {
// assert lock.getHoldCount() == 0;
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock(); // 将对迭代器链的操作及变量的操作用锁保护起来
try {
//数组为空
if (count == 0) {
// assert itrs == null;
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
// 初始时prevTakeIndex等于takeIndex
// nextIndex 等于 takeIndex
// nextItem 为takeIndex位置的元素
// cursor 为takeIndex的后一个位置
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
cursor = incCursor(takeIndex);
if (itrs == null) {
itrs = new Itrs(this);
} else {
itrs.register(this); // in this order
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
// assert takeIndex >= 0;
// assert prevTakeIndex == takeIndex;
// assert nextIndex >= 0;
// assert nextItem != null;
}
} finally {
lock.unlock();
}
}
可以看出初始化干了三件事:将自身加入迭代器链 , 初始化变量 , 清扫迭代器链。
2.1,初始化变量
- prevTakeIndex等于 takeIndex,记录本次迭代开始的位置,每此next都会对其进行更新。
- prevCycles = itrs.cycles,用于判断接下来数据是否过时。
- nextIndex 等于 takeIndex,这三个变量都是迭代过程中使用到的。
- nextItem 为takeIndex位置的元素
- cursor 为takeIndex的后一个位置
关于 cursor 的增加操作
private int incCursor(int index) {
// assert lock.getHoldCount() == 1;
if (++index == items.length)
index = 0;
if (index == putIndex)
index = NONE;
return index;
}
当遍历到 putIndex ,代表数据遍历结束,应该终止迭代,将 cursor 置为 NONE 标识,cursor 置为 NONE 后会引起迭代器的终止,逻辑在 next 与 hasNext 方法中。
2.2,注册
private Node head; // 标识迭代器链的头节点
void register(Itr itr) {
// assert lock.getHoldCount() == 1;
head = new Node(itr, head);
}
private class Node extends WeakReference<Itr> {
Node next;
Node(Itr iterator, Node next) {
super(iterator);
this.next = next;
}
}
将Node对 Itr
对象的引用设置为弱引用。当迭代线程结束迭代后就只会剩下Node 对 Itr 的弱引用,当 GC 启动后会回收该对象,通过 get 的返回来判断是否被回收。对于迭代器对象被回收的节点会被从迭代器链中删除。
2.3,清扫迭代器链 doSomeSweeping
清扫方法 doSomeSweeping 并非一次将整个链条探测一遍,开始时选择探测的范围4或16,若在范围内未探测到无效迭代器则结束,若是探测到则扩大探测范围,将范围恢复为16,继续往下探测。
先来介绍 Itrs
中的三个相关变量
//记录上次探测的结束位置节点,下次从此开始往后探测。
private Node sweeper = null;
// 探测范围
private static final int SHORT_SWEEP_PROBES = 4;
private static final int LONG_SWEEP_PROBES = 16;
doSomeSweeping 将清除迭代器链中的无效节点,所谓无效指的是:1,节点持有的 Itr
对象为空,说明被GC回收,能被回收也说明使用它的线程完成了迭代。2,迭代器 Itr
此时是 DETACHED 模式,对于迭代结束的迭代器会被置于 DETACHED 模式。迭代结束指的是迭代到 cursor 等于 putIndex,标志着全部数据迭代完毕。
doSomeSweeping 清扫迭代链但并非从头到尾全部探测一遍。之所以有这种设计我想是为了避免耗时,毕竟此时迭代线程持有着独占锁。
void doSomeSweeping(boolean tryHarder) {
// assert lock.getHoldCount() == 1;
// assert head != null;
//tryHarder 为true则 probes 为 16,否则为 4
// probes 代表本次的探测长度,所以你明白为啥取名叫tryHader了
int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
Node o, p; // o 代表 p 的前一个节点,用于链表中节点的删除
// 它代表了一次探测中到达的最后一个节点
final Node sweeper = this.sweeper;
// 限制最多只遍历一遍
// 若本次遍历从半道开始不是从头开始,当其遍历到尾部但probes仍>0
// 则转到头节点继续;但若是从头开始的遍历,遍历到尾probes>0,没必要再继续,应该终止遍历。
// 该功能的实现靠的就是 passedGo
boolean passedGo;
// 从头开始的遍历,passedGo 设为true,在遇到链表长度小于probes的情况,能够break终止
if (sweeper == null) {
o = null;
p = head;
passedGo = true;
// 从前一个线程终止的位置开始
} else {
o = sweeper;
p = o.next;
passedGo = false;
}
for (; probes > 0; probes--) {
if (p == null) {
if (passedGo) // 这就是passedGo发挥作用的地方
break;
//passedGo为false,说明本次遍历是从中间某位置开始,
//也就是说链表前面有一段是未遍历的,
//遍历到了尾部需要转回到头部继续遍历
o = null;
p = head;
passedGo = true;
}
final Itr it = p.get();
final Node next = p.next;
//节点持有的迭代器对象为null,或是数组为空或数据过时导致的DETACHED模式,
//则删除此节点
if (it == null || it.isDetached()) {
//当发现了一个被抛弃或过时的迭代器,
//则将探测范围probes变为16,相当于延长探测范围。
//这就是探测的本意吧:找不到就按原先的探测范围直到结束,
//若找到了一个就扩大探测范围继续往下找。
//以上的设计可能原因是:无论是由于 被回收 或是 过时
// 发现一个则之后的节点都极有可能存在相同的情况
probes = LONG_SWEEP_PROBES; // "try harder"
// unlink p
p.clear();
p.next = null;
if (o == null) {
head = next;
if (next == null) {
// We've run out of iterators to track; retire
itrs = null;
return;
}
}
else
o.next = next;
} else {
o = p;
}
p = next;
}
// 记录本次遍历结束位置节点
this.sweeper = (p == null) ? null : o;
}
3,迭代操作hasNext与next
3.1,hasNext
public boolean hasNext() {
// assert lock.getHoldCount() == 0;
if (nextItem != null)
return true;
noNext();
return false;
}
nextItem != null 说明仍有元素可以继续遍历,nextItem 代表next方法的返回值。当没有元素可继续遍历时调用 noNext 方法。
noNext
随着 next
的不断调用,cursor 随之增加,最后 cursor 等于 putIndex,表明数组元素全部遍历完,这之后下标变量变为 cursor = NONE ,nextIndex = NONE , nextItem = null
,逻辑再next
方法中。到这里直接退出不就行了 ? 不行,需要将迭代器的状态置为 DETACHED,这样才能被 doSomeSweeping
方法清除。这就是 noNext
方法实现的功能:将迭代器状态置为 DETACHED。
private void noNext() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
//调用该方法时cursor与nextIndex皆为NONE,逻辑在next方法中
// assert cursor == NONE;
// assert nextIndex == NONE;
// detach可能在最后一次next中被调用,所以这里先进行判断
if (!isDetached()) {
//lastRet记录前一个next返回的元素的下标,
//cursor等于putIndex,迭代结束,方法运行到此
//此时的lastRet一定是>= 0的
// assert lastRet >= 0;
//该方法处理数据过时问题,会修正下标变量或直接detach该迭代器
//那么为什么在这里调用该方法,有必要吗?
//因为我们要对 lastItem 进行赋值,可若数据过时了便没有这个必要了
//所以这里调用该方法,若过时则lastRet被置为REMOVED小于0,还会调用detach方法
incorporateDequeues(); // might update lastRet
if (lastRet >= 0) {
lastItem = itemAt(lastRet);
// assert lastItem != null;
detach(); // 调用detach
}
}
// assert isDetached();迭代器处于了DETACHED状态
// assert lastRet < 0 ^ lastItem != null;相当于lastRet < 0 && lastItem == null
} finally {
lock.unlock();
}
}
首先代码的执行是需要先获取锁的。
从代码中可以看出detach
一定会被调用,隐藏在incorporateDequeues中或是直接调用detach。还有需要注意的是代码中声明的几个assert,理解他们有助于理解代码的设计。
3.2,next
public E next() {
// assert lock.getHoldCount() == 0;
final E x = nextItem;
if (x == null)
throw new NoSuchElementException();
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (!isDetached())
//修正下标,主要是cursor,确保其在当前takeIndex之后(包括等于takeIndex)
//从而保证返回的元素不是过时的数据
incorporateDequeues();
// assert nextIndex != NONE;
// assert lastItem == null;
lastRet = nextIndex;
final int cursor = this.cursor;
if (cursor >= 0) {
nextItem = itemAt(nextIndex = cursor);
// assert nextItem != null;
this.cursor = incCursor(cursor);
} else {
nextIndex = NONE;
nextItem = null;
}
} finally {
lock.unlock();
}
return x;
}
重要的有两点:1,需要获取锁。2,获取前修正相关下标变量的值。用独占锁保证了安全,可也造成了获取元素前检查数组状态的必要性。
incorporateDequeues方法
该方法主要作用是:修正下标,保证返回数据的有效性。
由于多线程下为了确保安全迭代线程每次next都要先获取独占锁,得不到便需等待,等到被唤醒继续执行就需要对数组此时的状况进行判断,判断当前迭代器要获取的数据是否已经过时,将最新的 takeIndex 赋给迭代器的 cursor,从而确保迭代器不会返回过时的数据。
private void incorporateDequeues() {
// assert lock.getHoldCount() == 1;
// assert itrs != null;
// assert !isDetached();
// assert count > 0;
final int cycles = itrs.cycles;
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
final int prevCycles = this.prevCycles;
final int prevTakeIndex = this.prevTakeIndex;
if (cycles != prevCycles || takeIndex != prevTakeIndex) {
final int len = items.length;
// how far takeIndex has advanced since the previous
// operation of this iterator
//计算此时takeIndex 与 迭代器存储的prevTakeIndex之间的长度
//接下来要用它来判断迭代器接下来读取的数据是否已过时
long dequeues = (cycles - prevCycles) * len
+ (takeIndex - prevTakeIndex);
//接下来就是检查各个下标变量lastRet,nextIndex,cursor
//查看它们指向的数据是否已过时,所谓过时指的是此时 takeIndex 已在其前
//若过时就将lastRet与nextIndex置为REMOVED,cursor置为此时的takeIndex
if (invalidated(lastRet, prevTakeIndex, dequeues, len))
lastRet = REMOVED;
if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
nextIndex = REMOVED;
if (invalidated(cursor, prevTakeIndex, dequeues, len))
cursor = takeIndex;
// 这三个下标变量若都<0,说明该终止此迭代器
// detach会将preTakeIndex置为DETACHED,然后调用doSomeSweeping清扫迭代器链
//在isDetached中就是通过preTakeIndex是否小于0来判断迭代器是否终止
if (cursor < 0 && nextIndex < 0 && lastRet < 0)
detach();
//迭代器没有作废的话,更新prevCycles与prevTakeIndex的值
//回到next方法从takeIndex处开始继续往下遍历
else {
this.prevCycles = cycles;
this.prevTakeIndex = takeIndex;
}
}
}
invalidated 返回true 说明 index 失效。失效说明该下标变量在 takeIndex 之前。
private boolean invalidated(int index, int prevTakeIndex,
long dequeues, int length) {
// 下标变量小于0返回false,表明该下标变量的值不需要进行更改
// 有三个状态 NONE ,REMOVED,DETACHED 它们皆小于0。
//DETACHED:专门用于preTakeIndex使用,isDetached方法通过其来判断
//NONE:用于三个下标变量:cursor,nextIndex,lastRet;这三个用于迭代功能的实现。
//它们为NONE,表明迭代结束可能因为数组为空或是遍历完。
//REMOVED:用于lastRet 与 nextIndex。表面数据过时。
if (index < 0)
return false;
int distance = index - prevTakeIndex;
if (distance < 0)
distance += length;
return dequeues > distance;
}
detach
private void detach() {
// Switch to detached mode 将迭代器转换为 detached模式
// 下面的这些申明都说明了调用该方法时数组的状态
// assert lock.getHoldCount() == 1; 持有锁
// 下面四个下标变量的状态都表明迭代器
// assert cursor == NONE;
// assert nextIndex < 0;
// assert lastRet < 0 || nextItem == null;
// assert lastRet < 0 ^ lastItem != null;相当于lastRet < 0 && lastItem == null
if (prevTakeIndex >= 0) {
// assert itrs != null;
prevTakeIndex = DETACHED;
// try to unlink from itrs (but not too hard)
itrs.doSomeSweeping(true);
}
}
重点在于最后的四个 assert 声明:
- cursor == NONE
- nextIndex < 0;
- lastRet < 0 || nextItem == null
- lastRet < 0 ^ lastItem != null 相当于lastRet < 0 && lastItem == null
这便是迭代器的 DETACHED 状态,之后会将prevTakeIndex = DETACHED 用来标识该迭代器此时处于DETACHED 状态, isDetached 方法会返回true,从而在清扫方法 doSomeSweeping 中将该节点删除。
detach 方法代码中做了两件事:1,prevTakeIndex = DETACHED; 在 isDetached 中便是通过 prevTakeIndex 值是否小于0来判断迭代器是否处于 DETACHED 状态。2,调用 doSomeSweeping 删除该节点,并清扫迭代链,注意这里传的是true,也就是以最大探测范围 16 开始探测迭代链。为什么传true?为了尽可能删除该节点。
这里有个问题:detach 里调用 doSomeSweeping 一定能删除掉该节点吗?不一定,该方法在上面分析过,它的清扫是从上一个线程清扫结束的地方开始,探测范围为4或16,若在范围内没探测到就结束,若探测到就扩大探测范围,范围恢复为16再继续往下探测。
- linux学习第五十四篇:Tomcat介绍,安装jdk,安装Tomcat
- linux学习第五十九篇:LVS DR模式搭建,keepalived lvs
- linux学习第五十四篇:配置Tomcat监听80端口,配置Tomcat的虚拟主机,Tomcat日志
- linux学习第五十六篇:集群介绍,keepalived介绍,用keepalived配置高可用集群
- linux学习第五十八篇: 负载均衡集群介绍,LVS介绍,LVS的调度算法,LVS NAT模式搭建
- Python中eval带来的潜在风险,你知道吗?
- React Native自定义导航条
- android混淆那些坑
- 微信小程序开发入门篇
- Support Annotation Library使用详解
- React Native之Navigator
- React Native组件生命周期
- React Native使用原生组件
- Android ViewDragHelper及移动处理总结
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- SSH 上传文件及文件夹到linux服务器的方法
- apache tika检测文件是否损坏的方法
- Linux下二进制编译安装MySql centos7的教程
- Linux 6 修改ssh默认远程端口号的操作步骤
- 基于python的Linux系统指定进程性能监控思路详解
- ubuntu下的虚拟环境中安装Django的操作方法
- 详解linux下umask的使用
- Linux下设置每天自动备份数据库的方法
- Linux常用命令之chmod修改文件权限777和754
- 解决CentOS 7升级Python到3.6.6后yum出错问题总结
- Linux下如何挂载磁盘的方法示例
- centos7 PHP环境搭建 GD库 等插件安装方法
- CentOS服务器环境下MySQL主从同步配置方法
- awk命令
- linux下C语言实现写日志功能