多线程应用 - 阻塞队列LinkedBlockingQueue详解
一、阻塞队列LinkedBlockingQueue源码分析
直接上源码!
· LinkedBlockingQueue结构
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
与前面分析过的ArrayBlockingQueue和LinkedBlockingDeque一样,继承自AbstractQueue抽象类,并实现了BlockingQueue接口。
· LinkedBlockingQueue基础变量
//内部类 Node结点
static class Node<E> {
//当前结点中的元素
E item;
//下一个结点
Node<E> next;
Node(E x) {
item = x;
}
}
//队列总容量
private final int capacity;
//队列当前容量 使用了AtomicInteger来保证+1操作的原子性
private final AtomicInteger count = new AtomicInteger();
//头结点
transient Node<E> head;
//尾结点
private transient Node<E> last;
//读锁
private final ReentrantLock takeLock = new ReentrantLock();
//非空条件 下文中会提到 当进行读取操作时如果队列元素不存在,会阻塞
private final Condition notEmpty = takeLock.newCondition();
//写锁
private final ReentrantLock putLock = new ReentrantLock();
//非满条件 下文中会提到 当进行写操作时如果队列元素已满,会阻塞
private final Condition notFull = putLock.newCondition();
可以看到,容量大小的维护使用了AtomicInteger来维护保证容量计算的原子性,AtomicInteger如何保证原子性和他的弊端,可查看下面一篇有做具体分析。
在LinkedBlockingQueue中维护了读写锁,这也是与ArrayBlockingQueue和LinkedBlockingDeque的区别。
· LinkedBlockingQueue构造函数
//无参构造函数 会指定队列最满容量为最大值
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//带指定队列容量的构造函数 会指定队列最满容量为指定值
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//带指定集合的初始化构造函数
public LinkedBlockingQueue(Collection<? extends E> c) {
//先调用无参构造函数 创建一个最满容量为最大值的队列
this(Integer.MAX_VALUE);
//获取写锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
int n = 0;
//遍历元素集合
for (E e : c) {
//这一段说明 和ArrayLinkedQueueLinkedBlockingDeque一样 ,元素不能为null
if (e == null)
throw new NullPointerException();
//初始化时容量满了也会抛出异常
if (n == capacity)
throw new IllegalStateException("Queue full");
//依次放入元素
enqueue(new Node<E>(e));
//计数+1
++n;
}
//构造函数中,如果保证单例或者线程封闭的情况,可以不需要考虑原子性,直接赋值
count.set(n);
} finally {
//解锁
putLock.unlock();
}
}
构造函数中体现了在初始化操作时如果未指定队列容量大小,则会默认指定最大值作为容量大小。在指定初始化元素时,元素不能为空否则会抛出异常,并且遍历元素循环放入队列中时当前容量要小于最大容量大小,否则也会抛出异常。
· LinkedBlockingQueue基础方法
//唤醒非空条件 让读操作获取锁 继续执行读操作
private void signalNotEmpty() {
//获取读锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//唤醒
notEmpty.signal();
} finally {
//解锁
takeLock.unlock();
}
}
//唤醒非满条件 让写操作获取锁 继续执行写操作
private void signalNotFull() {
//获取写锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//唤醒
notFull.signal();
} finally {
//解锁
putLock.unlock();
}
}
//依次放入链表中,可以看出这是一个单向链表
private void enqueue(Node<E> node) {
last = last.next = node;
}
//查询并去除头结点元素
private E dequeue() {
//获取头结点
Node<E> h = head;
//头结点的下一个结点
Node<E> first = h.next;
//头结点的下一个结点指向自己,方便gc
h.next = h;
//重新设置头结点为原来头结点的下一个结点
head = first;
//获取原来头结点元素
E x = first.item;
//将原头结点元素置为null
first.item = null;
//返回原头结点元素
return x;
}
//读写锁都加锁,加锁顺序先写锁,再读锁
void fullyLock() {
putLock.lock();
takeLock.lock();
}
//读写锁都解锁,解锁顺序先读锁,再加锁
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}//获取当前使用容量大小
public int size() {
return count.get();
}
//获取剩余容量大小
public int remainingCapacity() {
return capacity - count.get();
}
上述方法中的enqueue和dequeue方法是LinkedBlockingQueue中写和读的底层核心方法。可以看出,写操作时是将新元素构造成结点从尾部开始插入,而读取时是从头部开始读取。
· LinkedBlockingQueue写方法
//存放元素
public void put(E e) throws InterruptedException {
//元素不能为null
if (e == null) throw new NullPointerException();
int c = -1;
//将当前元素构造成node结点
Node<E> node = new Node<E>(e);
//获取写锁
final ReentrantLock putLock = this.putLock;
//当前容量大小
final AtomicInteger count = this.count;
//获取响应中断锁
putLock.lockInterruptibly();
try {
//容量已满
while (count.get() == capacity) {
//阻塞当前的写操作
notFull.await();
}
//放入元素
enqueue(node);
//将容量+1,并返回操作前的容量
c = count.getAndIncrement();
//当前容量未满
if (c + 1 < capacity)
//唤醒阻塞的写操作线程
notFull.signal();
} finally {
//解锁
putLock.unlock();
}
if (c == 0)
//原来队列容量若为0,这一步的写操作需要唤醒读操作
signalNotEmpty();
}
//存放元素 与上述方法的区别:增加了返回值指定了阻塞时间,到达阻塞时间没被唤醒返回false
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//总体思想与上述put方法相同
if (e == null) throw new NullPointerException();
//获取应阻塞时间
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//容量满
while (count.get() == capacity) {
//阻塞时间到期,返回false
if (nanos <= 0)
return false;
//阻塞线程 并指定阻塞时间
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
//原来队列容量若为0,这一步的写操作需要唤醒读操作
signalNotEmpty();
return true;
}
//这个方法与put方法相比,不会阻塞线程,队列满则直接返回。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
写操作时会获取写锁,第一个方法当队列满了导致无法写入时会阻塞写线程,直到被唤醒。第二个方法指定了阻塞超时时间,超过了阻塞超时时间或被唤醒则停止阻塞,继续执行。第三个方法不会阻塞,当写入失败则直接返回。
· LinkedBlockingQueue读方法
//读取元素 会阻塞线程
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//获取读锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//当前队列元素为null 则阻塞
while (count.get() == 0) {
notEmpty.await();
}
//获取头结点元素
x = dequeue();
//当前容量先返回原容量,再减一写入内存
c = count.getAndDecrement();
if (c > 1)
//当前容量有元素,则唤醒 阻塞的读线程
notEmpty.signal();
} finally {
//解锁
takeLock.unlock();
}
if (c == capacity)
//原容量若是满的,队列出队后则要唤醒阻塞的写操作
signalNotFull();
//返回对首元素
return x;
}
//取出元素 阻塞设置超时时间,原理与take方法相似
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//当前队列元素为null 则阻塞
while (count.get() == 0) {
//阻塞时间到期,返回null
if (nanos <= 0)
return null;
//阻塞设置超长时间
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
//原容量若是满的,队列出队后则要唤醒阻塞的写操作
signalNotFull();
return x;
}
//读操作,如果因为队列为空读取失败 则直接返回null,不阻塞
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
读操作时会获取读锁,第一个方法当队列为空导致无法读取时会阻塞读线程,直到被唤醒。第二个方法指定了阻塞超时时间,超过了阻塞超时时间或被唤醒则停止阻塞,继续执行。第三个方法不会阻塞,当读取失败则直接返回。 可以对比看出,读线程的锁操作与写线程的锁操作类似。
· LinkedBlockingQueue获取双锁的方法
//1、删除元素
public boolean remove(Object o) {
if (o == null) return false;
//获取写锁和读锁
fullyLock();
try {
//从头结点遍历队列结点
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//获取第一个相同的元素
if (o.equals(p.item)) {
//删除替换操作
unlink(p, trail);
return true;
}
}
return false;
} finally {
//读锁和写锁解锁
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
//删除操作 trial的下一个结点指向原p结点的下一个结点
trail.next = p.next;
if (last == p)
//如果原来p是最后一个结点,那么新的尾结点为trial
last = trail;
if (count.getAndDecrement() == capacity)
//原容量满则唤醒写操作线程
notFull.signal();
}
//2、校验是否包含,思想与删除一样,并且也需要获取读写锁
public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}
//3、转换为数组,也需要获得读写锁
public Object[] toArray() {
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
}
//4、队列元素拼接成String 也需要获得读写锁
public String toString() {
fullyLock();
try {
Node<E> p = head.next;
if (p == null)
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (; ; ) {
E e = p.item;
sb.append(e == this ? "(this Collection)" : e);
p = p.next;
if (p == null)
return sb.append(']').toString();
sb.append(',').append(' ');
}
} finally {
fullyUnlock();
}
}
//5、清除队列,也需要获取读写锁
public void clear() {
fullyLock();
try {
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}
以上方法中都需要同时获取读写锁,目的是为了再对以上操作时,防止出现其他的读或写操作仍在进行导致以上方法操作失败。
二、阻塞队列LinkedBlockingQueue总结
从上述源码分析中可以看出LinkedBlockingQueue是基于单向链表的,并且写入时会依次插入到尾部,读取时是从头部开始读取。他也是线程安全的阻塞队列,阻塞条件为读操作时如果队列为空则阻塞、写操作时如果队列满则阻塞。与ArrayBlockingQueue和LinkedBlockingDeque不同的是,他维护了两个锁,分别是读锁和写锁,当读操作时则获取读锁资源,当操作写时则获取写锁资源,在高并发情况下,读写操作占用不同的锁资源进行不同的操作,可以提高性能。只有一些需要同时占用读写锁时的动作才会同时占用读写锁资源,如,删除操作、清除操作等等。
三、阻塞队列LinkedBlockingQueue、ArrayBlockingQueue、LinkedBlockingDeque区别
· LinkedBlockingQueue和ArrayBlockingQueue的区别
(1)存储形式不同,LinkedBlockingQueue是基于链表的单向队列,所以他会将放入队列中的元素先构造成Node,ArrayBlockingQueue是基于数组的队列,在存放元素时可以直接将元素放入到队列中。
他们都是基于队列的先进先出来存放元素和读取元素。
(2)LinkedBlockingQueue拥有读写锁,ArrayBlockingQueue在读写操作时都是一把锁,所以在读写同时进行时,LinkedBlockingQueue的性能会比ArrayBlockingQueue要好。
他们在读操作遇到队列为空或者写操作队列为满时都会阻塞线程。
(3)构造函数不同,LinkedBlockingQueue可以指定最大容量的小或者不指定,不指定时他的容量大小为最大值。而ArrayBlockingQueue在初始化时必须要指定容量大小。
他们插入的元素都不能为空。
上述测试代码中比较执行100w次的插入元素操作,比较性能。
结果如下:
arrayBlockingQueue执行时间:247
linkedBlockingQueue执行时间:1239
可以看到,linkedBlockingQueue中进行100w次的将元素构造为Node结点是非常消耗性能时间的。
· LinkedBlockingQueue和LinkedBlockingDeque的区别
(1)LinkedBlockingQueue拥有读写锁,LinkedBlockingDeque在读写操作时都是一把锁。
(2)LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表,即两边都可以进入进行读写操作。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法