【原创】Java并发编程系列29 | ConcurrentLinkedQueue

时间:2022-07-22
本文章向大家介绍【原创】Java并发编程系列29 | ConcurrentLinkedQueue,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

J.U.C 为常用的集合提供了并发安全的版本,前面讲解了 map 的并发安全集合 ConcurrentHashMap,List 并发安全集合 CopyOnWriteArrayList,Set 并发安全集合 CopyOnWriteArraySet,本篇文章就来介绍并发安全的队列 ConcurrentLinkedQueue。

ConcurrentLinkedQueue 是一个基于链接节点的无边界的线程安全队列,采用非阻塞算法实现线程安全。分以下部分讲解:

  1. 类结构
  2. offer()
  3. poll()
  4. 如何保证并发安全性
  5. 总结

1. 类结构

队列由单向链表实现,ConcurrentLinkedQueue 持有头尾指针(head/tail 属性)来管理队列。

队列进行出队入队时对节点的操作都是通过 CAS 实现,保证线程安全。

public class ConcurrentLinkedQueue<E> {
    private transient volatile Node<E> head;
    private transient volatile Node<E> tail;

    /**
     * 节点类
     * CAS保证节点操作安全
     */
    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;

        Node(E item) {
            // 获得item 和 next 的偏移量
            UNSAFE.putObject(this, itemOffset, item);
        }

        /**
         * 更改Node中的数据域item
         */
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        /**
         * 更改Node中的指针域next
         */
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        /**
         * 更改Node中的指针域next
         */
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
    }

    /**入队*/
    public boolean offer(E e)
    /**出队*/
    public E poll()
}

2. 入队 offer()

  1. 将入队节点设置成当前队列尾节点的下一个节点。
  2. 更新 tail 节点,tail 节点不总是尾节点。如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点;如果 tail 节点的 next 节点为空,则只入队不更新尾节点。

看下源码:

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    for (Node<E> t = tail, p = t;;) {// 注意这里是个循环
        Node<E> q = p.next;
        if (q == null) {//(1)
            /*
             * q == null 表示p是最后一个节点,设置t.next=newNode
             * 如果失败,说明表示其他线程已经修改了p的指向,循环尝试直到成功
             */
            if (p.casNext(null, newNode)) {// 2)
                // node 加入节点后会导致tail距离最后一个节点相差大于一个,需要更新tail
                if (p != t)//(3)
                    casTail(t, newNode);//(4) casTail:设置tail 尾节点
                return true;
            }
        }
        else if (p == q)//(5)
            /*
             * 多线程环境下,offer(e)和poll()同时执行,
             * 此时p节点被poll(),设置了p.next=p,所以此时q=p.next=p
             * 这里要重新设置p结点
             */
            p = (t != (t = tail)) ? t : head;//(6)
        else
            // tail并没有指向尾节点,重新设置p
            p = (p != t && t != (t = tail)) ? t : q;//(7)
    }
}

单看代码很难理解 offer()过程,不用担心,我们一起从头复现一下 offer()过程:

  1. 初始时,head、tail 都指向同一个 item 为 null 的节点
  1. offer(A):执行步骤(1) (2),将 A 加入队列,但不更新 tail。
  1. offer(B):第一次循环执行(7),设置 p=A 结点。第二次循环执行(1)(2)(3),插入 B 并将 taill 更新为 B。
  1. offer(C):执行步骤(1) (2),将 C 加入队列,但不更新 tail。
  1. 循环....

3. 出队 poll()

  1. 从队列里返回一个节点元素,并清空该节点对元素的引用
  2. 更新 head,并不是每次出队时都更新 head 节点。当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点;只有当 head 节点里没有元素时,出队操作才会更新 head 节点。
public E poll() {
    restartFromHead:// 如果出现p被删除的情况需要从head重新开始
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            // item不为null,将item返回并设置为null
            if (item != null && p.casItem(item, null)) {//(1)
                if (p != h)//(2)
                    /*
                     * p != head 则更新head
                     * p.next != null,则将head更新为p.next ,否则更新为p
                     */
                    updateHead(h, ((q = p.next) != null) ? q : p);//(3)
                return item;
            }
            // p.next == null 队列为空
            else if ((q = p.next) == null)//(4)
                updateHead(h, p);
                return null;
            }
            // 另一个线程已经把p从队列中删除并设置p.next = p,p已经被移除不能继续,需要重新开始
            else if (p == q)//(5)
                continue restartFromHead;
            else
                p = q;//(6)
        }
    }
}

/**
 * 更新head
 */
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

同样复现一下 poll()过程,接着上文 offer()之后的数据:

  1. 第一次 poll():第一次循环,执行(6)设置 p=A;第二次循环执行(1)(2)(3),将 A 返回,设置 A.item=null,更新 head=原 A 节点。
  1. 第二次 poll():第一循环,执行(6)设置 p=B;第二次循环执行(1)(2)(3),将 A 返回,设置 B.item=null,更新 head=原 B 节点。
  1. 第三次 poll():第一循环,执行(6)设置 p=C;第二次循环执行(1)(2)(3),将 A 返回,设置 B.item=null,更新 head=原 C 节点。

4. 总结

4.1 如何保证并发安全性

需要保证线程安全的三种情况:

  1. 多个线程同时 offer():

多个线程同时执行到 casNext()设置最后的节点,casNext()通过 CAS 实现,第一个线程执行成功设置了最后一个节点后,其他线程的在 CAS 时发现期望的最后节点和实际上的最后节点不一致,CAS 就会失败,然后继续循环尝试直到成功。

  1. 多个线程同时 poll():

同样是通过 CAS 保证线程安全,多个线程同时执行到 casItem()设置当前节点 item=null,第一个线程执行成功设置了当前节点 item=null 后,其他线程的在 CAS 时发现期望的 item 与实际的 item 不一致,CAS 就会失败,然后继续循环尝试 poll 下一个节点直到成功。

  1. 队列中只有一个元素时,线程 Aoffer()一个线程 Bpoll():

线程 A 要设置 p.next=newNode,但是此时 poll()将 p 删除了。当 poll()将 p 删除时设置了 p.next=p,offer()方法中会检查这种情况,发现有 p.next=p 就重新设置一个合适的 p 节点,以便将 newNode 入队。

4.2 head/tail 为何延迟更新

tail 更新时机:tail 节点不总是尾节点。如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点;如果 tail 节点的 next 节点为空,则只入队不更新尾节点。head 更新时机:并不是每次出队时都更新 head 节点。当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点;只有当 head 节点里没有元素时,出队操作才会更新 head 节点。

head 和 tail 的更新总是间隔了一个。如果让 tail 永远作为队列的队尾节点,代码不是逻辑简单容易实现吗?

如果让 tail 永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,如果大量的入队操作,每次都要执行 CAS 进行 tail 的更新,汇总起来对性能也会是大大的损耗。如果能减少 CAS 更新的操作,无疑可以大大提升入队的操作效率,所以 doug lea 大师每间隔 1 次(tail 和队尾节点的距离为 1)进行才利用 CAS 更新 tail。 对 head 的更新也是同样的道理。