深入Java并发之阻塞队列-迭代器(一)

时间:2019-02-28
本文章向大家介绍深入Java并发之阻塞队列-迭代器(一),主要包括深入Java并发之阻塞队列-迭代器(一)使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

本篇来分析下阻塞队列里迭代器的实现,以ArrayBlockingQueue源码来分析。

首先在开始前想一想,如何实现阻塞队列的迭代器功能?

在并发下有些线程在读,有些在改,还有些在使用迭代器遍历,怎么确保安全性?用独占锁将这些操作隔离开,我们看 ArrayBlockingQueue 确实是这么做的。

既然安全性得到保障那么还有什么问题是需要考虑的 ?
过时数据问题。假设一个线程从此时的 takeIndex 位置开始使用迭代器往后遍历,每此调用 next 获取下一个数据的操作都需要提前获取锁,有可能很长时间都获取不到锁,这段有其它线程在读取本质上就是增加 takeIndex,那么当轮到这个迭代线程执行时他就应该首先检查下情况,是否自己接下来要读取的内容已经失效了(即位置下标落后于此时 takeIndex值)。还有删除数据时也要考虑到被删除位置对多有迭代器的影响。

并发下不只一个线程在使用迭代器,每个使用者都会创建 Itr 对象,它们构成一条链,Itrs 管理这条链。

迭代器 Itr

当你调用 iterator() 方法时,创建迭代器对象 Itr;

    public Iterator<E> iterator() {
        return new Itr();
    }

1,重要的变量

先来介绍下迭代器 Itr 中非常重要的变量:

		/** Index to look for new nextItem; NONE at end */
        private int cursor;

        /** Element to be returned by next call to next(); null if none */
        private E nextItem;

        /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
        private int nextIndex;

注解很清楚的解释了三者的功能,每此 next() 调用返回的就是 nextItem 对象,而 nextIndex 指的是nextItem对象的下标位置,cursor 是 nextIndex 的下一个位置。在不断 next 获取元素过程中,这三者就是两个在前一个在后的这么往下移动着。

        /** Last element returned; null if none or not detached. */
        private E lastItem;

        /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
        private int lastRet;

lastItem 表示上一次返回的元素;lastRet 就是 lastItem 的下标。
我这里称 cursor,nextIndex,lastRet 为迭代过程的三剑客,当满足 cursor < 0 && nextIndex < 0 && lastRet < 0 时代表迭代器失效。

        /** Previous value of takeIndex, or DETACHED when detached */
        private int prevTakeIndex;

        /** Previous value of iters.cycles */
        private int prevCycles;

这两个变量与处理过时数据问题有关。

prevTakeIndex 代表本次遍历开始的位置,每此 next 都会进行修正;
prevCycles :Itrs 管理 Itr 链,它里面有个变量 cycles 记录 takeIndex 回到 0 位置的次数,迭代器的 prevCycles 存储该值。

迭代器操作需要获取独占锁,得不到就得等待,这就造成了其存储的 prevTakeIndex 与 prevCycles 可能过时,迭代器多处操作前都会通过这两个值来判断数据是否过时,以做相应的处理。

        /** Special index value indicating "not available" or "undefined" */
        private static final int NONE = -1;

        /**
         * Special index value indicating "removed elsewhere", that is,
         * removed by some operation other than a call to this.remove().
         */
        private static final int REMOVED = -2;

        /** Special value for prevTakeIndex indicating "detached mode" */
        private static final int DETACHED = -3;

DETACHED:专门用于preTakeIndex,isDetached方法通过其来判断迭代器状态
NONE:用于三个下标变量:cursor,nextIndex,lastRet;这三个下标变量用于迭代功能的实现。表明该位置数据不可用或未定义。
REMOVED:用于lastRet 与 nextIndex。表明数据过时或被删除。

接下来结合具体实现来看:

2,初始化

Itr 初始化过程:

    private class Itr implements Iterator<E> {
    
            Itr() {
            // assert lock.getHoldCount() == 0;
            lastRet = NONE;
            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
            lock.lock(); // 将对迭代器链的操作及变量的操作用锁保护起来
            try {
            	//数组为空
                if (count == 0) {
                    // assert itrs == null;
                    cursor = NONE;
                    nextIndex = NONE;
                    prevTakeIndex = DETACHED;
                } else {
                	// 初始时prevTakeIndex等于takeIndex
                	// nextIndex 等于 takeIndex
                	// nextItem 为takeIndex位置的元素
                	// cursor 为takeIndex的后一个位置
                    final int takeIndex = ArrayBlockingQueue.this.takeIndex;
                    prevTakeIndex = takeIndex;
                    nextItem = itemAt(nextIndex = takeIndex);
                    cursor = incCursor(takeIndex);
                    if (itrs == null) {
                        itrs = new Itrs(this);
                    } else {
                        itrs.register(this); // in this order
                        itrs.doSomeSweeping(false);
                    }
                    prevCycles = itrs.cycles;
                    // assert takeIndex >= 0;
                    // assert prevTakeIndex == takeIndex;
                    // assert nextIndex >= 0;
                    // assert nextItem != null;
                }
            } finally {
                lock.unlock();
            }
        }

可以看出初始化干了三件事:将自身加入迭代器链 , 初始化变量 , 清扫迭代器链。

2.1,初始化变量

  • prevTakeIndex等于 takeIndex,记录本次迭代开始的位置,每此next都会对其进行更新。
  • prevCycles = itrs.cycles,用于判断接下来数据是否过时。
  • nextIndex 等于 takeIndex,这三个变量都是迭代过程中使用到的。
  • nextItem 为takeIndex位置的元素
  • cursor 为takeIndex的后一个位置

关于 cursor 的增加操作

        private int incCursor(int index) {
            // assert lock.getHoldCount() == 1;
            if (++index == items.length)
                index = 0;
            if (index == putIndex)
                index = NONE;
            return index;
        }

当遍历到 putIndex ,代表数据遍历结束,应该终止迭代,将 cursor 置为 NONE 标识,cursor 置为 NONE 后会引起迭代器的终止,逻辑在 next 与 hasNext 方法中。

2.2,注册

        private Node head; // 标识迭代器链的头节点
        
        void register(Itr itr) {
            // assert lock.getHoldCount() == 1;
            head = new Node(itr, head);
        }
        
        private class Node extends WeakReference<Itr> {
            Node next;

            Node(Itr iterator, Node next) {
                super(iterator);
                this.next = next;
            }
        }

将Node对 Itr 对象的引用设置为弱引用。当迭代线程结束迭代后就只会剩下Node 对 Itr 的弱引用,当 GC 启动后会回收该对象,通过 get 的返回来判断是否被回收。对于迭代器对象被回收的节点会被从迭代器链中删除。

2.3,清扫迭代器链 doSomeSweeping

清扫方法 doSomeSweeping 并非一次将整个链条探测一遍,开始时选择探测的范围4或16,若在范围内未探测到无效迭代器则结束,若是探测到则扩大探测范围,将范围恢复为16,继续往下探测。

先来介绍 Itrs 中的三个相关变量

		//记录上次探测的结束位置节点,下次从此开始往后探测。
        private Node sweeper = null;
		// 探测范围
        private static final int SHORT_SWEEP_PROBES = 4;
        private static final int LONG_SWEEP_PROBES = 16;

doSomeSweeping 将清除迭代器链中的无效节点,所谓无效指的是:1,节点持有的 Itr 对象为空,说明被GC回收,能被回收也说明使用它的线程完成了迭代。2,迭代器 Itr 此时是 DETACHED 模式,对于迭代结束的迭代器会被置于 DETACHED 模式。迭代结束指的是迭代到 cursor 等于 putIndex,标志着全部数据迭代完毕。

doSomeSweeping 清扫迭代链但并非从头到尾全部探测一遍。之所以有这种设计我想是为了避免耗时,毕竟此时迭代线程持有着独占锁。

        void doSomeSweeping(boolean tryHarder) {
            // assert lock.getHoldCount() == 1;
            // assert head != null;

			//tryHarder 为true则 probes 为 16,否则为 4
			// probes 代表本次的探测长度,所以你明白为啥取名叫tryHader了
            int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
            Node o, p; // o 代表 p 的前一个节点,用于链表中节点的删除
            // 它代表了一次探测中到达的最后一个节点
            final Node sweeper = this.sweeper;
            // 限制最多只遍历一遍
            // 若本次遍历从半道开始不是从头开始,当其遍历到尾部但probes仍>0
            // 则转到头节点继续;但若是从头开始的遍历,遍历到尾probes>0,没必要再继续,应该终止遍历。
            // 该功能的实现靠的就是 passedGo
            boolean passedGo;
			// 从头开始的遍历,passedGo 设为true,在遇到链表长度小于probes的情况,能够break终止
            if (sweeper == null) {
                o = null;
                p = head;
                passedGo = true;
            // 从前一个线程终止的位置开始
            } else {
                o = sweeper;
                p = o.next;
                passedGo = false;
            }

            for (; probes > 0; probes--) {
                if (p == null) {
                    if (passedGo) // 这就是passedGo发挥作用的地方
                        break;
                    //passedGo为false,说明本次遍历是从中间某位置开始,
                    //也就是说链表前面有一段是未遍历的,
                    //遍历到了尾部需要转回到头部继续遍历
                    o = null;
                    p = head;
                    passedGo = true;
                }
                final Itr it = p.get();
                final Node next = p.next;
                //节点持有的迭代器对象为null,或是数组为空或数据过时导致的DETACHED模式,
                //则删除此节点
                if (it == null || it.isDetached()) {
                    //当发现了一个被抛弃或过时的迭代器,
                    //则将探测范围probes变为16,相当于延长探测范围。
                    //这就是探测的本意吧:找不到就按原先的探测范围直到结束,
                    //若找到了一个就扩大探测范围继续往下找。
                    
                    //以上的设计可能原因是:无论是由于 被回收 或是 过时
                    // 发现一个则之后的节点都极有可能存在相同的情况
                    probes = LONG_SWEEP_PROBES; // "try harder"
                    // unlink p
                    p.clear();
                    p.next = null;
                    if (o == null) {
                        head = next;
                        if (next == null) {
                            // We've run out of iterators to track; retire
                            itrs = null;
                            return;
                        }
                    }
                    else
                        o.next = next;
                } else {
                    o = p;
                }
                p = next;
            }
			// 记录本次遍历结束位置节点
            this.sweeper = (p == null) ? null : o;
        }

3,迭代操作hasNext与next

3.1,hasNext

        public boolean hasNext() {
            // assert lock.getHoldCount() == 0;
            if (nextItem != null)
                return true;
            noNext();
            return false;
        }

nextItem != null 说明仍有元素可以继续遍历,nextItem 代表next方法的返回值。当没有元素可继续遍历时调用 noNext 方法。

noNext
随着 next 的不断调用,cursor 随之增加,最后 cursor 等于 putIndex,表明数组元素全部遍历完,这之后下标变量变为 cursor = NONE ,nextIndex = NONE , nextItem = null,逻辑再next方法中。到这里直接退出不就行了 ? 不行,需要将迭代器的状态置为 DETACHED,这样才能被 doSomeSweeping 方法清除。这就是 noNext 方法实现的功能:将迭代器状态置为 DETACHED

        private void noNext() {
            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
            lock.lock();
            try {
            	//调用该方法时cursor与nextIndex皆为NONE,逻辑在next方法中
                // assert cursor == NONE;
                // assert nextIndex == NONE;
                
                // detach可能在最后一次next中被调用,所以这里先进行判断
                if (!isDetached()) {
                	//lastRet记录前一个next返回的元素的下标,
                	//cursor等于putIndex,迭代结束,方法运行到此
                	//此时的lastRet一定是>= 0的
                	
                    // assert lastRet >= 0; 

					//该方法处理数据过时问题,会修正下标变量或直接detach该迭代器
					//那么为什么在这里调用该方法,有必要吗?
					//因为我们要对 lastItem 进行赋值,可若数据过时了便没有这个必要了
					//所以这里调用该方法,若过时则lastRet被置为REMOVED小于0,还会调用detach方法
                    incorporateDequeues(); // might update lastRet
                    if (lastRet >= 0) {
                        lastItem = itemAt(lastRet);
                        // assert lastItem != null;
                        detach(); // 调用detach
                    }
                }
                // assert isDetached();迭代器处于了DETACHED状态
                // assert lastRet < 0 ^ lastItem != null;相当于lastRet < 0 && lastItem == null
            } finally {
                lock.unlock();
            }
        }

首先代码的执行是需要先获取锁的。
从代码中可以看出detach一定会被调用,隐藏在incorporateDequeues中或是直接调用detach。还有需要注意的是代码中声明的几个assert,理解他们有助于理解代码的设计。

3.2,next

        public E next() {
            // assert lock.getHoldCount() == 0;
            final E x = nextItem;
            if (x == null)
                throw new NoSuchElementException();
            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
            lock.lock();
            try {
                if (!isDetached())
                	//修正下标,主要是cursor,确保其在当前takeIndex之后(包括等于takeIndex)
                	//从而保证返回的元素不是过时的数据
                    incorporateDequeues();
                // assert nextIndex != NONE;
                // assert lastItem == null;
                lastRet = nextIndex;
                final int cursor = this.cursor;
                if (cursor >= 0) {
                    nextItem = itemAt(nextIndex = cursor);
                    // assert nextItem != null;
                    this.cursor = incCursor(cursor);
                } else {
                    nextIndex = NONE;
                    nextItem = null;
                }
            } finally {
                lock.unlock();
            }
            return x;
        }

重要的有两点:1,需要获取锁。2,获取前修正相关下标变量的值。用独占锁保证了安全,可也造成了获取元素前检查数组状态的必要性。

incorporateDequeues方法
该方法主要作用是:修正下标,保证返回数据的有效性。
由于多线程下为了确保安全迭代线程每次next都要先获取独占锁,得不到便需等待,等到被唤醒继续执行就需要对数组此时的状况进行判断,判断当前迭代器要获取的数据是否已经过时,将最新的 takeIndex 赋给迭代器的 cursor,从而确保迭代器不会返回过时的数据。

        private void incorporateDequeues() {
            // assert lock.getHoldCount() == 1;
            // assert itrs != null;
            // assert !isDetached();
            // assert count > 0;

            final int cycles = itrs.cycles;
            final int takeIndex = ArrayBlockingQueue.this.takeIndex;
            final int prevCycles = this.prevCycles;
            final int prevTakeIndex = this.prevTakeIndex;

            if (cycles != prevCycles || takeIndex != prevTakeIndex) {
                final int len = items.length;
                // how far takeIndex has advanced since the previous
                // operation of this iterator
                //计算此时takeIndex 与 迭代器存储的prevTakeIndex之间的长度
                //接下来要用它来判断迭代器接下来读取的数据是否已过时
                long dequeues = (cycles - prevCycles) * len
                    + (takeIndex - prevTakeIndex);

                //接下来就是检查各个下标变量lastRet,nextIndex,cursor
                //查看它们指向的数据是否已过时,所谓过时指的是此时 takeIndex 已在其前
                //若过时就将lastRet与nextIndex置为REMOVED,cursor置为此时的takeIndex
                if (invalidated(lastRet, prevTakeIndex, dequeues, len))
                    lastRet = REMOVED;
                if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
                    nextIndex = REMOVED;
                if (invalidated(cursor, prevTakeIndex, dequeues, len))
                    cursor = takeIndex;
				
				// 这三个下标变量若都<0,说明该终止此迭代器
				// detach会将preTakeIndex置为DETACHED,然后调用doSomeSweeping清扫迭代器链
				//在isDetached中就是通过preTakeIndex是否小于0来判断迭代器是否终止
                if (cursor < 0 && nextIndex < 0 && lastRet < 0)
                    detach();
                //迭代器没有作废的话,更新prevCycles与prevTakeIndex的值
                //回到next方法从takeIndex处开始继续往下遍历
                else {
                    this.prevCycles = cycles;
                    this.prevTakeIndex = takeIndex;
                }
            }
        }

invalidated 返回true 说明 index 失效。失效说明该下标变量在 takeIndex 之前。

        private boolean invalidated(int index, int prevTakeIndex,
                                    long dequeues, int length) {
// 下标变量小于0返回false,表明该下标变量的值不需要进行更改
// 有三个状态 NONE ,REMOVED,DETACHED 它们皆小于0。
//DETACHED:专门用于preTakeIndex使用,isDetached方法通过其来判断
//NONE:用于三个下标变量:cursor,nextIndex,lastRet;这三个用于迭代功能的实现。
//它们为NONE,表明迭代结束可能因为数组为空或是遍历完。
//REMOVED:用于lastRet 与 nextIndex。表面数据过时。
            if (index < 0) 
                return false;
            int distance = index - prevTakeIndex;
            if (distance < 0)
                distance += length;
            return dequeues > distance;
        }

detach

        private void detach() {
            // Switch to detached mode 将迭代器转换为 detached模式
            // 下面的这些申明都说明了调用该方法时数组的状态
            // assert lock.getHoldCount() == 1; 持有锁
            // 下面四个下标变量的状态都表明迭代器
            // assert cursor == NONE;
            // assert nextIndex < 0; 
            // assert lastRet < 0 || nextItem == null;
            // assert lastRet < 0 ^ lastItem != null;相当于lastRet < 0 && lastItem == null
            if (prevTakeIndex >= 0) {
                // assert itrs != null;
                prevTakeIndex = DETACHED;
                // try to unlink from itrs (but not too hard)
                itrs.doSomeSweeping(true);
            }
        }

重点在于最后的四个 assert 声明:

  • cursor == NONE
  • nextIndex < 0;
  • lastRet < 0 || nextItem == null
  • lastRet < 0 ^ lastItem != null 相当于lastRet < 0 && lastItem == null

这便是迭代器的 DETACHED 状态,之后会将prevTakeIndex = DETACHED 用来标识该迭代器此时处于DETACHED 状态, isDetached 方法会返回true,从而在清扫方法 doSomeSweeping 中将该节点删除。

detach 方法代码中做了两件事:1,prevTakeIndex = DETACHED; 在 isDetached 中便是通过 prevTakeIndex 值是否小于0来判断迭代器是否处于 DETACHED 状态。2,调用 doSomeSweeping 删除该节点,并清扫迭代链,注意这里传的是true,也就是以最大探测范围 16 开始探测迭代链。为什么传true?为了尽可能删除该节点。

这里有个问题:detach 里调用 doSomeSweeping 一定能删除掉该节点吗?不一定,该方法在上面分析过,它的清扫是从上一个线程清扫结束的地方开始,探测范围为4或16,若在范围内没探测到就结束,若探测到就扩大探测范围,范围恢复为16再继续往下探测。