多线程应用 - 阻塞队列LinkedBlockingQueue详解

时间:2022-07-23
本文章向大家介绍多线程应用 - 阻塞队列LinkedBlockingQueue详解,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

一、阻塞队列LinkedBlockingQueue源码分析

直接上源码!

· LinkedBlockingQueue结构

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

与前面分析过的ArrayBlockingQueue和LinkedBlockingDeque一样,继承自AbstractQueue抽象类,并实现了BlockingQueue接口。

· LinkedBlockingQueue基础变量

//内部类 Node结点
static class Node<E> {

    //当前结点中的元素
    E item;
    //下一个结点
    Node<E> next;
    Node(E x) {
        item = x;
    }
}

//队列总容量
private final int capacity;
//队列当前容量 使用了AtomicInteger来保证+1操作的原子性
private final AtomicInteger count = new AtomicInteger();
//头结点
transient Node<E> head;
//尾结点
private transient Node<E> last;
//读锁
private final ReentrantLock takeLock = new ReentrantLock();
//非空条件 下文中会提到 当进行读取操作时如果队列元素不存在,会阻塞
private final Condition notEmpty = takeLock.newCondition();
//写锁
private final ReentrantLock putLock = new ReentrantLock();
//非满条件 下文中会提到 当进行写操作时如果队列元素已满,会阻塞
private final Condition notFull = putLock.newCondition();

可以看到,容量大小的维护使用了AtomicInteger来维护保证容量计算的原子性,AtomicInteger如何保证原子性和他的弊端,可查看下面一篇有做具体分析。

在LinkedBlockingQueue中维护了读写锁,这也是与ArrayBlockingQueue和LinkedBlockingDeque的区别。

· LinkedBlockingQueue构造函数

//无参构造函数 会指定队列最满容量为最大值
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

//带指定队列容量的构造函数 会指定队列最满容量为指定值
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

//带指定集合的初始化构造函数
public LinkedBlockingQueue(Collection<? extends E> c) {
    //先调用无参构造函数 创建一个最满容量为最大值的队列
    this(Integer.MAX_VALUE);
    //获取写锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        int n = 0;
        //遍历元素集合
        for (E e : c) {
//这一段说明 和ArrayLinkedQueueLinkedBlockingDeque一样             ,元素不能为null
            if (e == null)
                throw new NullPointerException();
            //初始化时容量满了也会抛出异常
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            //依次放入元素
            enqueue(new Node<E>(e));
            //计数+1
            ++n;
        }
        //构造函数中,如果保证单例或者线程封闭的情况,可以不需要考虑原子性,直接赋值
        count.set(n);
    } finally {
        //解锁
        putLock.unlock();
    }
}

构造函数中体现了在初始化操作时如果未指定队列容量大小,则会默认指定最大值作为容量大小。在指定初始化元素时,元素不能为空否则会抛出异常,并且遍历元素循环放入队列中时当前容量要小于最大容量大小,否则也会抛出异常。

· LinkedBlockingQueue基础方法

//唤醒非空条件 让读操作获取锁 继续执行读操作
private void signalNotEmpty() {
    //获取读锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //唤醒
        notEmpty.signal();
    } finally {
        //解锁
        takeLock.unlock();
    }
}

//唤醒非满条件 让写操作获取锁 继续执行写操作
private void signalNotFull() {
    //获取写锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //唤醒
        notFull.signal();
    } finally {
        //解锁
        putLock.unlock();
    }
}

//依次放入链表中,可以看出这是一个单向链表
private void enqueue(Node<E> node) {
    last = last.next = node;
}

//查询并去除头结点元素
private E dequeue() {
    //获取头结点
    Node<E> h = head;
    //头结点的下一个结点
    Node<E> first = h.next;
    //头结点的下一个结点指向自己,方便gc
    h.next = h;
    //重新设置头结点为原来头结点的下一个结点
    head = first;
    //获取原来头结点元素
    E x = first.item;
    //将原头结点元素置为null
    first.item = null;
    //返回原头结点元素
    return x;
}
//读写锁都加锁,加锁顺序先写锁,再读锁
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

//读写锁都解锁,解锁顺序先读锁,再加锁
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}//获取当前使用容量大小
public int size() {
    return count.get();
}

//获取剩余容量大小
public int remainingCapacity() {
    return capacity - count.get();
}

上述方法中的enqueue和dequeue方法是LinkedBlockingQueue中写和读的底层核心方法。可以看出,写操作时是将新元素构造成结点从尾部开始插入,而读取时是从头部开始读取。

· LinkedBlockingQueue写方法

//存放元素
public void put(E e) throws InterruptedException {
    //元素不能为null
    if (e == null) throw new NullPointerException();
    int c = -1;
    //将当前元素构造成node结点
    Node<E> node = new Node<E>(e);
    //获取写锁
    final ReentrantLock putLock = this.putLock;
    //当前容量大小
    final AtomicInteger count = this.count;
    //获取响应中断锁
    putLock.lockInterruptibly();
    try {
        //容量已满
        while (count.get() == capacity) {
            //阻塞当前的写操作
            notFull.await();
        }
        //放入元素
        enqueue(node);
        //将容量+1,并返回操作前的容量
        c = count.getAndIncrement();
        //当前容量未满
        if (c + 1 < capacity)
            //唤醒阻塞的写操作线程
            notFull.signal();
    } finally {
        //解锁
        putLock.unlock();
    }
    if (c == 0)
        //原来队列容量若为0,这一步的写操作需要唤醒读操作
        signalNotEmpty();
}
//存放元素 与上述方法的区别:增加了返回值指定了阻塞时间,到达阻塞时间没被唤醒返回false
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    //总体思想与上述put方法相同
    if (e == null) throw new NullPointerException();
    //获取应阻塞时间
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        //容量满
        while (count.get() == capacity) {
            //阻塞时间到期,返回false
            if (nanos <= 0)
                return false;
            //阻塞线程 并指定阻塞时间
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        //原来队列容量若为0,这一步的写操作需要唤醒读操作
        signalNotEmpty();
    return true;
}

//这个方法与put方法相比,不会阻塞线程,队列满则直接返回。
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

写操作时会获取写锁,第一个方法当队列满了导致无法写入时会阻塞写线程,直到被唤醒。第二个方法指定了阻塞超时时间,超过了阻塞超时时间或被唤醒则停止阻塞,继续执行。第三个方法不会阻塞,当写入失败则直接返回。

· LinkedBlockingQueue读方法

//读取元素 会阻塞线程
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    //获取读锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //当前队列元素为null 则阻塞
        while (count.get() == 0) {
            notEmpty.await();
        }
        //获取头结点元素
        x = dequeue();
        //当前容量先返回原容量,再减一写入内存
        c = count.getAndDecrement();
        if (c > 1)
            //当前容量有元素,则唤醒 阻塞的读线程
            notEmpty.signal();
    } finally {
        //解锁
        takeLock.unlock();
    }
    if (c == capacity)
        //原容量若是满的,队列出队后则要唤醒阻塞的写操作
        signalNotFull();
    //返回对首元素
    return x;
}

//取出元素 阻塞设置超时时间,原理与take方法相似
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //当前队列元素为null 则阻塞
        while (count.get() == 0) {
            //阻塞时间到期,返回null
            if (nanos <= 0)
                return null;
            //阻塞设置超长时间
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        //原容量若是满的,队列出队后则要唤醒阻塞的写操作
        signalNotFull();
    return x;
}

//读操作,如果因为队列为空读取失败 则直接返回null,不阻塞
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

读操作时会获取读锁,第一个方法当队列为空导致无法读取时会阻塞读线程,直到被唤醒。第二个方法指定了阻塞超时时间,超过了阻塞超时时间或被唤醒则停止阻塞,继续执行。第三个方法不会阻塞,当读取失败则直接返回。 可以对比看出,读线程的锁操作与写线程的锁操作类似。

· LinkedBlockingQueue获取双锁的方法

//1、删除元素
public boolean remove(Object o) {
    if (o == null) return false;
    //获取写锁和读锁
    fullyLock();
    try {
        //从头结点遍历队列结点
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            //获取第一个相同的元素
            if (o.equals(p.item)) {
                //删除替换操作
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        //读锁和写锁解锁
        fullyUnlock();
    }
}
void unlink(Node<E> p, Node<E> trail) {
    p.item = null;
    //删除操作 trial的下一个结点指向原p结点的下一个结点
    trail.next = p.next;
    if (last == p)
        //如果原来p是最后一个结点,那么新的尾结点为trial
        last = trail;
    if (count.getAndDecrement() == capacity)
        //原容量满则唤醒写操作线程
        notFull.signal();
}
//2、校验是否包含,思想与删除一样,并且也需要获取读写锁
public boolean contains(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        for (Node<E> p = head.next; p != null; p = p.next)
            if (o.equals(p.item))
                return true;
        return false;
    } finally {
        fullyUnlock();
    }
}

//3、转换为数组,也需要获得读写锁
public Object[] toArray() {
    fullyLock();
    try {
        int size = count.get();
        Object[] a = new Object[size];
        int k = 0;
        for (Node<E> p = head.next; p != null; p = p.next)
            a[k++] = p.item;
        return a;
    } finally {
        fullyUnlock();
    }
}
//4、队列元素拼接成String 也需要获得读写锁
public String toString() {
    fullyLock();
    try {
        Node<E> p = head.next;
        if (p == null)
            return "[]";
        StringBuilder sb = new StringBuilder();
        sb.append('[');
        for (; ; ) {
            E e = p.item;
            sb.append(e == this ? "(this Collection)" : e);
            p = p.next;
            if (p == null)
                return sb.append(']').toString();
            sb.append(',').append(' ');
        }
    } finally {
        fullyUnlock();
    }
}

//5、清除队列,也需要获取读写锁
public void clear() {
    fullyLock();
    try {
        for (Node<E> p, h = head; (p = h.next) != null; h = p) {
            h.next = h;
            p.item = null;
        }
        head = last;
        // assert head.item == null && head.next == null;
        if (count.getAndSet(0) == capacity)
            notFull.signal();
    } finally {
        fullyUnlock();
    }
}

以上方法中都需要同时获取读写锁,目的是为了再对以上操作时,防止出现其他的读或写操作仍在进行导致以上方法操作失败。

二、阻塞队列LinkedBlockingQueue总结

从上述源码分析中可以看出LinkedBlockingQueue是基于单向链表的,并且写入时会依次插入到尾部,读取时是从头部开始读取。他也是线程安全的阻塞队列,阻塞条件为读操作时如果队列为空则阻塞、写操作时如果队列满则阻塞。与ArrayBlockingQueue和LinkedBlockingDeque不同的是,他维护了两个锁,分别是读锁和写锁,当读操作时则获取读锁资源,当操作写时则获取写锁资源,在高并发情况下,读写操作占用不同的锁资源进行不同的操作,可以提高性能。只有一些需要同时占用读写锁时的动作才会同时占用读写锁资源,如,删除操作、清除操作等等。

三、阻塞队列LinkedBlockingQueue、ArrayBlockingQueue、LinkedBlockingDeque区别

· LinkedBlockingQueue和ArrayBlockingQueue的区别

(1)存储形式不同,LinkedBlockingQueue是基于链表的单向队列,所以他会将放入队列中的元素先构造成Node,ArrayBlockingQueue是基于数组的队列,在存放元素时可以直接将元素放入到队列中。

他们都是基于队列的先进先出来存放元素和读取元素。

(2)LinkedBlockingQueue拥有读写锁,ArrayBlockingQueue在读写操作时都是一把锁,所以在读写同时进行时,LinkedBlockingQueue的性能会比ArrayBlockingQueue要好。

他们在读操作遇到队列为空或者写操作队列为满时都会阻塞线程。

(3)构造函数不同,LinkedBlockingQueue可以指定最大容量的小或者不指定,不指定时他的容量大小为最大值。而ArrayBlockingQueue在初始化时必须要指定容量大小。

他们插入的元素都不能为空。

上述测试代码中比较执行100w次的插入元素操作,比较性能。

结果如下:

arrayBlockingQueue执行时间:247

linkedBlockingQueue执行时间:1239

可以看到,linkedBlockingQueue中进行100w次的将元素构造为Node结点是非常消耗性能时间的。

· LinkedBlockingQueue和LinkedBlockingDeque的区别

(1)LinkedBlockingQueue拥有读写锁,LinkedBlockingDeque在读写操作时都是一把锁。

(2)LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表,即两边都可以进入进行读写操作。