多线程应用 - 基于AQS的Condition
Condition类其实已经在之前的阻塞队列中有过分析。除了使用Synchronized关键字来作为同步锁外,ReentrantLock也可以代替Synchronized来作为同步锁。另外在Object方法中,可以发现有wait()方法和notify()方法来实现多线程中的等待/通知模式,相对于ReentrantLock,也可以使用Condition类中的await()和signal()方法来实现等待/通知模式。
Condition类底层的实现是基于AQS,所以有必要写一篇来相互学习一下。
一、Condition类
public interface Condition {
//通用阻塞方法 即 当前线程被唤醒之前或中断之前一直处于阻塞状态
//之前文中分析的阻塞队列使用较多
void await() throws InterruptedException;
//阻塞方法,不可中断,当线程被唤醒之前一直处于阻塞状态 - 用的不多
void awaitUninterruptibly();
//阻塞方法,即 当前线程被唤醒、被中断、或阻塞时间未超时情况下 一直处于阻塞状态
//返回值为阻塞剩余时间
long awaitNanos(long nanosTimeout) throws InterruptedException;
//阻塞方法,即 当前线程被唤醒、被中断、或阻塞时间未超时情况下 一直处于阻塞状态
boolean await(long time, TimeUnit unit) throws InterruptedException;
//阻塞方法,即 当前线程被唤醒、被中断、或阻塞时间未到达指定期限情况下 一直处于阻塞状态
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒一个等待线程 - 与阻塞方法相对应
void signal();
//唤醒所有等待线程 - 与阻塞方法相对应
void signalAll();
}
可以看到Condition其实是一个接口,并且方法中提供了阻塞、指定超时时间的阻塞、唤醒队列等方法。
查看这些方法的引用,可以看到这些方法的实现是在AQS中的。
二、Condition示例
public static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) {
new AwaitTest().start();
new SignalTest().start();
}
static class AwaitTest extends Thread {
public void run() {
System.out.println("awaitTest=====线程开始");
lock.lock();
try {
System.out.println("awaitTest线程调用await方法之前");
condition.await();
System.out.println("awaitTest线程调用await方法之后");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
static class SignalTest extends Thread {
public void run() {
System.out.println("signalTest=====线程开始");
lock.lock();
try {
System.out.println("signalTest线程调用signal方法之前");
condition.signal();
System.out.println("signalTest线程调用signal方法之后");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
执行结果:
awaitTest=====线程开始
awaitTest线程调用await方法之前
signalTest=====线程开始
signalTest线程调用signal方法之前
signalTest线程调用signal方法之后
awaitTest线程调用await方法之后
上述结果发现,调用await的线程会让出锁资源让其他线程来执行,其他线程调用signal方法并执行完后,线程一才会恢复,继续执行。
三、基于AQS的Condition类
前面发布的文章中,分析了基于AQS实现的ReentrantLock,这一篇在前文AQS的基础上来分析AQS中await和signal的核心方法的实现。
· Condition类
在AQS中存在内部类,ConditionObject实现Condition,因此这也是为什么Condition需要和ReentrantLock类绑定的原因。
public class ConditionObject implements Condition, java.io.Serializable {
· Condition类内部变量
//队列中第一个结点
private transient Node firstWaiter;
//队列中最后一个结点
private transient Node lastWaiter;
//无参构造函数
public ConditionObject()
//标识-发生了中断但是不需要破抛出异常
private static final int REINTERRUPT = 1;
//标识-发生了中断 需要抛出异常
private static final int THROW_IE = -1;
可以看到Condition中也维护了一个自己的队列,用来维护线程状态。
· Condition类的await()
//await方法
public final void await() throws InterruptedException {
//当前线程已经被中断了,抛出异常 不允许阻塞
if (Thread.interrupted())
throw new InterruptedException();
//该方法的作用是将当前线程加入到阻塞的Condition队列中
Node node = addConditionWaiter();
//这这里因为线程调用了await方法,所以会释放锁资源
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//挂起当前线程
LockSupport.park(this);
//线程恢复后,修改线程一结点状态 -2 —> 0 ,并调用enq方法,放到aqs队列中
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//尝试获取锁资源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//移除Condition队列中的失效队列
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
//若当前线程没被挂起,中断当前线程
reportInterruptAfterWait(interruptMode);
}
基于上述线程一和线程二的例子举例。线程一获取锁资源后线程二暂时无法获得锁资源,所以会进入aqs等待队列中等待锁资源(AQS原理分析过)。
然后线程一调用了await方法,先将自己放入到Condition等待队列尾部,并释放线程一锁资源,此时线程一会被挂起。直到被线程二调用signal()方法唤醒。
唤醒后操作:线程一结点状态由-2变为0,并通过enq方法加入到aqs锁等待队列中。并且尝试获取锁资源。此时的线程一状态已经变为0,其他线程进入Condition队列后线程一会被移除Condition队列中。
加入Condition队列中的方法,尾插法:
private Node addConditionWaiter() {
//获取最后一个结点
Node t = lastWaiter;
//这里有个清楚结点操作,如果队尾结点不是处于Condition-等待状态
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
//获取真正的最后一个状态为-2的结点
t = lastWaiter;
}
//将当前线程构造为结点,同时指定状态为-2
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//当前队列中无结点,那么该线程为头结点
if (t == null)
firstWaiter = node;
else
//否则插入到尾部 也是尾插法
t.nextWaiter = node;
//指定当前结点为尾结点
lastWaiter = node;
return node;
}
释放线程一锁资源方法:
//释放当前结点锁资源
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取当前线程锁资源state计数器
int savedState = getState();
//释放锁资源,唤醒下一个线程
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
挂起线程后的方法:
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
//修改状态为0,加入到aqs阻塞队列中
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
线程一调用await()方法后,线程一先会被插入到Condition队列,挂起等待。
被唤醒后会尝试获取锁资源,因此会被放入aqs锁等待队列。
· Condition类的signal()
//唤醒锁资源方法
public final void signal() {
//这个判断是因为 线程调用signal是建立在线程有锁资源的情况
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//等待队列中的第一个结点
Node first = firstWaiter;
if (first != null)
//唤醒方法
doSignal(first);
}
上图为线程二调用signal方法后唤醒线程一的方法。此时队列中firstWaiter为线程一。
doSignal方法,将Condition中的线程一从队列中移除:
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//将头结点从等待队列中移除
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
唤醒线程一结点,核心方法:
//头结点处理 核心方法
final boolean transferForSignal(Node node) {
//cas更新结点状态为-2更新为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//线程一结点被添加到aqs队列中
Node p = enq(node);
int ws = p.waitStatus;
//修改原尾结点的结点状态为-1 SIGNAL
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//唤醒线程一
LockSupport.unpark(node.thread);
return true;
}
signal方法比较好理解,先移除Condition队列中的首节点,更新首节点中线程的waitStatus,唤醒线程。线程唤醒后又会进入await方法原先被中断的方法,继续向下执行,获取线程一的锁资源,执行线程一方法。
四、Condition注意点
总体来说,有了上一篇AQS的基础,理解Condition并不难。在Condition中新认识了-2Condition,和-1Signal两种状态,-2Condition即进入Condition队列的等待状态,-1Signal即被唤醒状态。
Condition类结构中可以看出,他是基于AQS实现的,因此需要与ReentrantLock相绑定,调用await方法时会释放锁资源并将阻塞线程加入到Condition队列中等待直到被其他线程唤醒。其他线程可通过Signal方法唤醒调用await方法而阻塞中断的线程,之前被中断的线程会从Condition队列中被移除,然后加入到aqs等待队列中尝试获取锁资源,因此其他线程需要完成释放锁资源后aqs等待队列中的线程才可以重新获得锁资源(并不是一旦其他线程调用signal方法就可以重新获得锁资源)。
Condition队列也与AQS队列不同,Condition队列只会按照FIFO顺序,并不像AQS一样具有公平锁和非公平锁来获取锁资源。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 01Python的基本的数据结构之List
- 02Python数据结构之再谈List的常用操作
- 03Python List不得不知的操作之改、查
- 04Python基础之字符串Str
- 05Python元组tuple的个性
- Tkinter Canvas
- Python实现最小二乘法
- 一个简单的例子学明白用Python插值
- python 类class基础简明笔记
- 数据离散化及其KMeans算法实现的理解
- [tensorflow损失函数系列]sparse_softmax_cross_entropy_with_logits
- 怎样将Anaconda设置为国内的镜像
- Python实现KMeans算法
- Python面向对象编程
- HTML和CSS常见问题整理