多线程应用 - 阻塞队列LinkedBlockingDeque详解

时间:2022-07-23
本文章向大家介绍多线程应用 - 阻塞队列LinkedBlockingDeque详解,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

一、LinkedBlockingDeque简介

在多线程阻塞队列的应用中上一篇已经讲述了ArrayBlockingQueue在这一篇主要介绍思想与他差不多的另一个阻塞队列,基于链表的阻塞队列-LinkedBlockingDeque。基于链表的阻塞队列和基于数组的阻塞队列相同,内部都有一把可重入锁,对于该队列的写操作和读操作都会进行加锁,所以他们都是线程安全的,但是写操作和读操作都会占用锁资源所以在并发量大的情况下会降低性能。另外内部维护了读操作时和写操作时候的Condition,当队列在读取元素时,若发现队列中没有元素,会阻塞读操作,直到队列中有元素被可被读取时才会被唤醒。同理,写操作的Condition,当队列需要进行写入操作时,若发现队列容量满的时候,会阻塞写操作,直到队列中有元素被取出时才会被唤醒。

二、LinkedBlockingDeque源码详解

· 结构

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {

LinkedBlockingDeque继承了父类AbstractQueue,实现了BlockingDeque类。

· 属性

//双向链表节点
static final class Node<E> {

    //节点中的元素
    E item;
    //当前节点上一个元素
    Node<E> prev;
    //当前节点后一个元素
    Node<E> next;
    Node(E x) {
        item = x;
    }
}

//链表中第一个节点(头节点)
transient Node<E> first;
//链表中最后一个节点(尾节点)
transient Node<E> last;
//队列当前容量
private transient int count;
//队列最大容量
private final int capacity;
//可重入锁
final ReentrantLock lock = new ReentrantLock();
//非空条件 - 取出元素时会因为队列为空而阻塞
private final Condition notEmpty = lock.newCondition();
//非满条件 - 放入元素时会因为队列容量满了而阻塞
private final Condition notFull = lock.newCondition();

· 构造函数

//未指定队列最大容量大小,会默认设置大小为Integer.MAX_VALUE
public LinkedBlockingDeque() {
    this(Integer.MAX_VALUE);
}

//指定队列最大容量大小的构造函数,容量大小必须大于0,否则会报错
public LinkedBlockingDeque(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
}

//指定需要初始化队列元素的集合,如果通过这个构造器new出来的队列,最大容量会被默认设置为最大值
public LinkedBlockingDeque(Collection<? extends E> c) {
    //调用指定队列容量大小的构造函数,设置大小为Integer.MAX_VALUE
    this(Integer.MAX_VALUE);
    //获取锁
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
        //遍历集合元素
        for (E e : c) {
            if (e == null)
                //元素不能为空,否则抛出异常
                throw new NullPointerException();
            //从尾部插入元素,插入失败抛出异常
            if (!linkLast(new Node<E>(e)))
                throw new IllegalStateException("Deque full");
        }
    } finally {
        //解锁操作
        lock.unlock();
    }
}

从构造函数源码中可以看到一些特点,创建队列对象时,如果指定了最大容量大小则使用指定的数量作为当前队列的最大容量,若没有则使用最大值作为队列的最大容量。如果指定了初始化时候需要放入的元素集合,那么队列最大容量也会被指定为最大值。

和ArrayBlockingQueue相同,元素不能存放null值,否则会报错。

· 头部插入和尾部插入核心方法

//头部插入
private boolean linkFirst(Node<E> node) {
    //当前容量大于队列最大容量时,直接返回插入失败
    if (count >= capacity)
        return false;
    //队列中的头结点
    Node<E> f = first;
    //原来的头结点作为 新插入结点的后一个结点
    node.next = f;
    //替换头结点 为新插入的结点
    first = node;
    //尾结点不存在,将尾结点置为当前新插入的结点
    if (last == null)
        last = node;
    else
        //原来的头结点的上一个结点为当前新插入的结点
        f.prev = node;
    //当前容量增加
    ++count;
    //唤醒读取时因队列中无元素而导致阻塞的线程
    notEmpty.signal();
    return true;
}

//尾部插入
private boolean linkLast(Node<E> node) {
    //当前容量大于队列最大容量时,直接返回插入失败
    if (count >= capacity)
        return false;
    //获取尾节点
    Node<E> l = last;
    //将新插入的前一个节点指向原来的尾节点
    node.prev = l;
    //尾结点设置为新插入的结点
    last = node;
    //头结点为空,新插入的结点作为头节点
    if (first == null)
        first = node;
    else
        //将原尾结点的下一个结点指向新插入的节点
        l.next = node;
    //当前容量增加
    ++count;
    //唤醒读取时因队列中无元素而导致阻塞的线程
    notEmpty.signal();
    return true;
}

· 删除核心方法

//删除头结点
private E unlinkFirst() {
    //获取当前头结点
    Node<E> f = first;
    //头结点为空 返回空
    if (f == null)
        return null;
    //获取头结点的下一个结点
    Node<E> n = f.next;
    //获取头结点元素(记录return需要用到的删除了哪个元素)
    E item = f.item;
    //将头结点元素置为null
    f.item = null;
    //将头结点的下一个结点指向自己 方便gc
    f.next = f;
    //设置头结点为原头结点的下一个结点
    first = n;
    //若原头结点的下一个结点不存在(队列中没有了结点)
    if (n == null)
        //将尾结点也置为null
        last = null;
    else
        //新的头结点的前一个结点指向null,因为他已经作为了头结点 所以不需要指向上一个结点
        n.prev = null;
    //当前数量减少
    --count;
    //唤醒因添加元素时队列容量满导致阻塞的线程
    notFull.signal();
    //返回原来的头结点中的元素
    return item;
}

//删除尾结点
private E unlinkLast() {
    //获取当前尾结点
    Node<E> l = last;
    //尾结点不存在 返回null
    if (l == null)
        return null;
    //获取当前尾结点的上一个结点
    Node<E> p = l.prev;
    //获取当前尾结点中的元素,需要返回记录
    E item = l.item;
    //将当前尾结点的元素置为null
    l.item = null;
    //并将当前尾结点的上一个结点指向自己,方便gc
    l.prev = l;
    //设置新的尾结点,为原来尾结点的前一个结点
    last = p;
    //若无新的尾结点,头结点置为空(队列中没有了结点)
    if (p == null)
        first = null;
    else
        //将新的尾结点的下一个结点指向null,因为他已经为尾结点所以不需要指向下一个结点
        p.next = null;
    //数量减少
    --count;
    //唤醒因添加元素时队列容量满导致阻塞的线程
    notFull.signal();
    //返回原来的尾结点元素
    return item;
}

//删除指定结点
void unlink(Node<E> x) {
    //获取x的上一个结点
    Node<E> p = x.prev;
    //获取x的下一个结点
    Node<E> n = x.next;
    if (p == null) {
        //上一个结点不存在,说明x为头结点,调用删除头结点方法
        unlinkFirst();
    } else if (n == null) {
        //下一个结点不存在,说明x为尾结点,调用删除尾结点方法
        unlinkLast();
    } else {
        //这两行其实就是 将需要删除的x结点的上一个结点指向x结点的下一个结点
        p.next = n;
        n.prev = p;
        //将需要删除结点的元素置为null 方便gc
        x.item = null;
        //减少容量
        --count;
        //唤醒因添加元素时队列容量满导致阻塞的线程
        notFull.signal();
    }
}

上图通过unlink这些方法删除的结点比较简单,但是阅读源码后发现他们没有加锁,是因为他们其实是作为被调用方,获得了可重入锁。

· 添加

//添加头结点元素
public void addFirst(E e) {
    //如果添加失败,抛出异常
    if (!offerFirst(e))
        throw new IllegalStateException("Deque full");
}

//添加尾结点元素
public void addLast(E e) {
    //如果添加失败,抛出异常
    if (!offerLast(e))
        throw new IllegalStateException("Deque full");
}

//添加头结点元素
public boolean offerFirst(E e) {
    //添加的元素为空 抛出空指针异常
    if (e == null) throw new NullPointerException();
    //将元素构造为结点
    Node<E> node = new Node<E>(e);
    //这边会加锁,并调用添加头结点插入的核心方法
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return linkFirst(node);
    } finally {
        //解锁
        lock.unlock();
    }
}

//添加尾结点元素
public boolean offerLast(E e) {
    //添加的元素为空 抛出空指针异常
    if (e == null) throw new NullPointerException();
    //将元素构造为结点
    Node<E> node = new Node<E>(e);
    //这边会加锁,并调用添加尾结点插入的核心方法
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return linkLast(node);
    } finally {
        lock.unlock();
    }
}

//头结点插入
public void putFirst(E e) throws InterruptedException {
    //元素不能为空
    if (e == null) throw new NullPointerException();
    //将元素构造为结点
    Node<E> node = new Node<E>(e);
    //这边会加锁,并调用添加头结点插入的核心方法
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
//头结点如果插入失败,会阻塞该方法,直到取出结点          或删除结点时被唤醒
        while (!linkFirst(node))
            notFull.await();
    } finally {
        //解锁
        lock.unlock();
    }
}

//尾结点插入
public void putLast(E e) throws InterruptedException {
    //元素不能为空
    if (e == null) throw new NullPointerException();
    //将元素构造为结点
    Node<E> node = new Node<E>(e);
    //这边会加锁,并调用添加尾结点插入的核心方法
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
//尾结点如果插入失败,会阻塞该方法,          直到取出结点或删除结点时被唤醒
        while (!linkLast(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

//头结点插入 可指定阻塞时间
public boolean offerFirst(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    //元素不能为空
    if (e == null) throw new NullPointerException();
    //将元素构造为结点
    Node<E> node = new Node<E>(e);
    //计算剩余应阻塞时间
    long nanos = unit.toNanos(timeout);
    //这边会加锁,并调用添加头结点插入的核心方法
    final ReentrantLock lock = this.lock;
    //获取可响应中断的锁,保证阻塞时间到期后可重新获得锁
    lock.lockInterruptibly();
    try {
//头结点如果插入失败,会阻塞该方法,          直到取出结点或删除结点时被唤醒
        //或者阻塞时间到期直接返回失败
        while (!linkFirst(node)) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        return true;
    } finally {
        //解锁
        lock.unlock();
    }
}

//头结点插入 可指定阻塞时间
public boolean offerLast(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    //元素不能为空
    if (e == null) throw new NullPointerException();
    //将元素构造为结点
    Node<E> node = new Node<E>(e);
    //计算剩余应阻塞时间
    long nanos = unit.toNanos(timeout);
    //这边会加锁,并调用添加尾结点插入的核心方法
    final ReentrantLock lock = this.lock;
    //获取可响应中断的锁,保证阻塞时间到期后可重新获得锁
    try {
//尾结点如果插入失败,会阻塞该方法,          直到取出结点或删除结点时被唤醒
        //或者阻塞时间到期直接返回失败
        while (!linkLast(node)) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        return true;
    } finally {
        //解锁
        lock.unlock();
    }
}

LinkedBlockingDeque提供了三种头部/尾部的插入方法,区别在于第一种在于插入时会加锁,并直接返回执行结果,不会阻塞。第二种当发现队列满时,会一直阻塞等待,直到被唤醒。第三种与第二种类似只不过可以指定阻塞等待时间,当发现队列满时,会阻塞一定时间,直到被唤醒执行插入方法或阻塞时间过期而返回false。另外如果插入方法失败,则会抛出异常。并且插入的元素不能为null,否则也会抛出异常。

下面的删除方法也与插入方法的思想类似。

· 删除

//删除头结点
public E removeFirst() {
    //删除头结点失败抛出异常
    E x = pollFirst();
    if (x == null) throw new NoSuchElementException();
    //返回被删除的元素
    return x;
}



//删除尾结点
public E removeLast() {
    //删除尾结点失败抛出异常
    E x = pollLast();
    if (x == null) throw new NoSuchElementException();
    //返回被删除的元素
    return x;
}

//删除头结点 - 加锁 直接返回结果
public E pollFirst() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //调用删除头结点核心方法
        return unlinkFirst();
    } finally {
        lock.unlock();
    }
}

//删除尾结点 - 加锁 直接返回结果
public E pollLast() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //调用删除尾结点核心方法
        return unlinkLast();
    } finally {
        lock.unlock();
    }
}

//删除头结点 - 加锁,如果删除失败则阻塞
public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        //调用删除头结点核心方法
        while ((x = unlinkFirst()) == null)
            //阻塞
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

//删除尾结点 - 加锁,如果删除失败则阻塞
public E takeLast() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        //调用删除尾结点核心方法
        while ((x = unlinkLast()) == null)
            //阻塞
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

//删除头结点 - 加锁,如果删除失败则阻塞 可以指定阻塞时间
public E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException {
    //计算应阻塞时间
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        E x;
        //调用删除头结点核心方法
        while ((x = unlinkFirst()) == null) {
            if (nanos <= 0)
                //阻塞时间过期,返回结果
                return null;
            //阻塞 并指定阻塞时间
            nanos = notEmpty.awaitNanos(nanos);
        }
        return x;
    } finally {
        lock.unlock();
    }
}

//删除尾结点 - 加锁,如果删除失败则阻塞 可以指定阻塞时间
public E pollLast(long timeout, TimeUnit unit)
        throws InterruptedException {
    //计算应阻塞时间
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        E x;
        //调用删除尾结点核心方法
        while ((x = unlinkLast()) == null) {
            if (nanos <= 0)
                //阻塞时间过期,返回结果
                return null;
            //阻塞 并指定阻塞时间
            nanos = notEmpty.awaitNanos(nanos);
        }
        return x;
    } finally {
        lock.unlock();
    }
}

· 查找

//获取头结点的元素
public E getFirst() {
    E x = peekFirst();
    //元素为空 抛出异常
    if (x == null) throw new NoSuchElementException();
    return x;
}

//获取尾结点的元素
public E getLast() {
    E x = peekLast();
    //元素为空 抛出异常
    if (x == null) throw new NoSuchElementException();
    return x;
}

//获取头结点元素方法
public E peekFirst() {
    //读取操作也会加锁哦
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //头结点为空返回null,否则返回头结点的元素
        return (first == null) ? null : first.item;
    } finally {
        lock.unlock();
    }
}

//获取尾结点元素方法
public E peekLast() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //尾结点为空返回null,否则返回尾结点的元素
        return (last == null) ? null : last.item;
    } finally {
        lock.unlock();
    }
}

LinkedBlockingDeque的源码和ArrayBlockingQueue一样比较简单,而且可以看出他们总体的思想还是差不多的,只不过LinkedBlockingDeque是基于双向链表的,因此在操作时会将元素转换成Node会增大开销。

接下来会分析下LinkedBlockingQueue,他的思想就与LinkedBlockingDeque和ArrayBlockingQueue会有些差别,比如读和写的加锁方式。