多线程应用 - 超详细的AQS详情
这篇主要是分析下AQS的原理,说实话挺难懂的。写文章的时候也难以下手。先解释下AQS是什么。
JUC,即java.util.concurrent,这些并发包中公平锁和非公平锁是基于AQS原理实现的。实际上AQS就是一个队列同步器,核心数据结构是双向链表+线程结点状态。底层状态的改变的操作是通过CAS。
一、入手AQS
AQS因为是底层原理,所以我们需要借助JUC下的ReentrantLock来理解。这里提醒下,不仅仅是ReentrantLock类是基于AQS,可以查看AQS的引用发现,CountDownLatch、Semaphore这些也使用了AQS原理。
· AQS结构
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
· ReentrantLock结构
public class ReentrantLock implements Lock, java.io.Serializable {
ReentrantLock类实现了Lock接口。因此RentrantLock类也具有Lock接口中定义的这些锁的核心方法。方法如下:
public interface Lock {
//获取锁 - 可重入锁
void lock();
//获取可响应中断锁
void lockInterruptibly() throws InterruptedException;
//尝试获取锁,如果获取成功返回true;获取失败返回false
boolean tryLock();
//尝试获取锁,如果获取成功返回true;获取失败则会等待一定时间,如果还是获取失败返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//解锁操作
void unlock();
//用作锁的条件 必须通过lock来构建该对象
Condition newCondition();
}
到这里单看ReentrantLock类,发现和AQS也没有什么关系。继续查看ReentrantLock的内部类结构。
· ReentrantLock构造函数
//无参构造函数 默认锁为非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//fail如果为true-公平锁,false-非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
构造函数中,可定义使用公平锁还是非公平锁。
· 公平锁内部类
static final class FairSync extends Sync {
//获取锁,调用AQS中的acquire 尝试获取锁
final void lock() {
acquire(1);
}
//获取锁
protected final boolean tryAcquire(int acquires) {
//当前线程
final Thread current = Thread.currentThread();
//锁的状态 - 也为线程计数器
int c = getState();
//没有线程获取锁资源
if (c == 0) {
//获取锁资源,aqs内部安全判断(AQS中会讲到),cas修改锁状态
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
//当前线程已经获取锁资源 - 重入锁操作
} else if (current == getExclusiveOwnerThread()) {
//锁计数器+1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//更新锁状态
setState(nextc);
return true;
}
return false;
}
}
· 非公平锁内部类
//非公平锁
static final class NonfairSync extends Sync {
//获取锁
final void lock() {
//cas修改无锁状态0更新为1
if (compareAndSetState(0, 1))
//更新成功 说明之前是没有线程占用锁资源,当前占用锁资源的为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//获取非公平锁
protected final boolean tryAcquire(int acquires) {
//调用父类sync中的非公平锁获取方法,返回结果
return nonfairTryAcquire(acquires);
}
}
· Sync类
公平锁和非公平锁类都继承自Sync,而Sync则继承自AQS。
//内部类AQS锁的实现
abstract static class Sync extends AbstractQueuedSynchronizer {
//获取锁
abstract void lock();
//获取非公平锁
final boolean nonfairTryAcquire(int acquires) {
//当前线程
final Thread current = Thread.currentThread();
//获取当前锁的状态
int c = getState();
//锁状态为0 当前锁资源没有被其他线程占用
if (c == 0) {
//cas修改锁的状态
if (compareAndSetState(0, acquires)) {
//设置当前线程 持有锁资源
setExclusiveOwnerThread(current);
return true;
}
//如果当前线程已经拥有了这个锁资源
} else if (current == getExclusiveOwnerThread()) {
//nextc相当于一个锁的计数器,当前状态+acquires(如果重入到另一个方法,一般是1)
//即重入锁后 计数器+1
int nextc = c + acquires;
//计数器小于0,加锁失败
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//设置状态
setState(nextc);
return true;
}
//加锁失败
return false;
}
//解锁操作
protected final boolean tryRelease(int releases) {
//计数器-释放锁的个数,线程重入锁中释放一般是1
//即计数器个数-1
int c = getState() - releases;
//当前线程并没有获取锁的话,说明有问题
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//标记 - 是否线程中所有重入锁解锁成功
boolean free = false;
//c为0 表示线程解锁成功
if (c == 0) {
free = true;
//设置没有线程获取锁资源
setExclusiveOwnerThread(null);
}
//设置锁的状态
setState(c);
return free;
}
//判断当前线程是否有锁资源
protected final boolean isHeldExclusively() {
//即上面加锁、解锁操作中设置的exclusiveOwnerThread线程是否为当前线程
return getExclusiveOwnerThread() == Thread.currentThread();
}
//aqs中的条件构造
final ConditionObject newCondition() {
return new ConditionObject();
}
//获取拥有锁资源的线程
final Thread getOwner() {
//不为0,说明有线程拥有锁资源,为exclusiveOwnerThread线程
return getState() == 0 ? null : getExclusiveOwnerThread();
}
//当前线程是否拥有锁资源
final int getHoldCount() {
//先判断拥有锁资源的是否为该线程
//如果是当前线程判断状态,大于0说明有锁资源
return isHeldExclusively() ? getState() : 0;
}
//是否被锁定
final boolean isLocked() {
//判断状态不为0
return getState() != 0;
}
//序列化
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);
}
}
源码读到这里,可以看到ReentrantLock还是比较简单的。类内部本质上其实是维护了一个state变量,当获取锁时,state会+1,当释放锁时,state会-1,如果state为0说明线程释放了锁资源。
· ReentrantLock基本方法
//获取锁
public void lock() {
sync.lock();
}
//获取响应中断的锁
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//尝试获取锁,直接返回
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
//尝试获取锁,如果没获取成功,会阻塞指定时间,超时后若还未获取锁 返回false
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
//解锁操作
public void unlock() {
sync.release(1);
}
//获取锁的条件
public Condition newCondition() {
return sync.newCondition();
}
//获取当前线程保持锁定的个数
public int getHoldCount() {
return sync.getHoldCount();
}
//当前线程是否保持锁定
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
//锁是否被占用着
public boolean isLocked() {
return sync.isLocked();
}
//锁是公平锁还是非公平锁
public final boolean isFair() {
return sync instanceof FairSync;
}
//获取拥有该锁资源的线程
protected Thread getOwner() {
return sync.getOwner();
}
//获取是否有因请求锁资源而阻塞的线程
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
//判断线程是否在等待锁队列中
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
//获取等待锁队列的长度
public final int getQueueLength() {
return sync.getQueueLength();
}
//获取等待锁队列
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
//线程有被阻塞 condition.await状态
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject) condition);
}
//获取线程阻塞队列的长度
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject) condition);
}
//获取阻塞队列
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject) condition);
}
二、AQS源码分析
· AQS内部属性
//AQS无参构造函数,没啥好说的
protected AbstractQueuedSynchronizer() {
}
//AQS头结点
private transient volatile Node head;
//AQS尾结点
private transient volatile Node tail;
//状态
private volatile int state;
//获取状态值
protected final int getState() {
return state;
}
//设置状态值
protected final void setState(int newState) {
state = newState;
}
//cas修改状态值
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
从内部属性中可以看到,AQS其实是一个双向链表。查看Node结点的内部类属性:
//AQS内部是一个结点
static final class Node {
//共享结点
static final Node SHARED = new Node();
//独占结点
static final Node EXCLUSIVE = null;
//当线程等待超时或者被中断,需要从同步队列中取消等待
//1-取消
static final int CANCELLED = 1;
//后继结点若处于等待状态情况下,如果当前结点取消等待, 通知后继结点,使后继结点得以运行
//-1-通知
static final int SIGNAL = -1;
//结点在等待的情况下,其他线程调用了Condition.signal()方法后 ,将结点从等待队列移到同步队列中
//-2-等待
static final int CONDITION = -2;
//下一次的共享状态会被无条件传播下去
//-3-传播
static final int PROPAGATE = -3;
//结点状态
volatile int waitStatus;
//上一个结点 即前驱结点
volatile Node prev;
//下一个结点 即后继结点
volatile Node next;
//结点对应的同步状态的线程
volatile Thread thread;
//下一个等待者结点
Node nextWaiter;
//当前阶段是否处于共享状态
final boolean isShared() {
return nextWaiter == SHARED;
}
//获取前驱结点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
//前驱结点不能为空
throw new NullPointerException();
else
//获取前驱结点
return p;
}
//结点无参构造
Node() {
}
//addWaiter方法会使用该构造函数
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
//Condition会使用该构造函数
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
Node结点中的状态属性比较难道,先过个眼熟,理解即可,下面分析会使用到这些状态值。
· 加锁操作
//获取锁资源
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
//线程中断
selfInterrupt();
}
}
线程获取锁资源方法,我们发现他在AQS中并没有具体实现,具体的实现会在其他类中,可以看下该方法的引用。
//在AQS中没有具体实现,实现实现在其他类中
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
因为这篇是基于ReentrantLock分析的AQS,所以可以看到在ReetrantLock中做的事情,上文其实已经有源码分析过。公平锁中首先会判断锁的state数值如果为0,表示锁没有被任何线程所占用,可以让线程获取锁资源,通过cas修改state,并设置锁资源的拥有者为当前线程。如果锁的state数值不为0,需要判断获取锁资源的是否为当前线程,如果不是直接返回失败,如果是,表示遇到了可重入情况,将锁的计数器+1。
而在AQS中,会先判断ReentrantLock的tryAcquire方法是否成功,如果不成功,假设第一个线程加锁成功,第二个线程调用获取锁资源方法,则第二个线程会加入到等待队列中。执行代码首先如下:
//构建结点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//尾结点
Node pred = tail;
//尾结点不为空
if (pred != null) {
//原来的尾结点作为新插入的结点的前驱结点, 新插入的结点作为原插入结点的后继结点
node.prev = pred;
//cas修改尾结点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//尾结点如果为空,则调用enq方法 会自旋插入到尾结点
enq(node);
return node;
}
这里可以看到,新来的结点的插入都会被插到尾结点中。如果尾结点为空,需要调用enq方法。 enq代码如下:
private Node enq(final Node node) {
//自旋
for (; ; ) {
//获取尾结点
Node t = tail;
//尾结点为空,必须设置尾结点
if (t == null) {
//cas设置头结点,并且尾结点为头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//当前结点的前驱结点 指向尾结点
node.prev = t;
//cas修改尾结点 插入的node结点作为尾结点
if (compareAndSetTail(t, node)) {
//原未结点的后继结点指向插入结点
t.next = node;
//返回原来的尾结点,这里的返回没什么用
return t;
}
}
}
}
enq中的目的其实就是为了不让尾结点为空,初始化尾结点,插入都是按照尾插法。迷糊的同学看下图,画图说明:
上图中发生的操作都是在enq方法中,假如并发情况下,有第三个线程争抢获取锁资源,仍会调用addWaiter方法,并且此时尾结点已经不为空。因此将线程结点插入到尾部,并通过CAS修改尾结点。画图说明:
如果后面还有线程需要获取锁,也会进入等待队列中,并且依次插入到尾部。
addWaiter方法执行完后,会继续调用acquireQueued方法,源码如下:
//AQS获取资源核心操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//中断位标记
boolean interrupted = false;
//自旋
for (; ; ) {
//获取传入结点的前驱结点
final Node p = node.predecessor();
//如果是传入结点是头结点,会尝试获取锁资源
if (p == head && tryAcquire(arg)) {
//当前结点尝试获取锁资源成功,头结点为当前结点
setHead(node);
//帮助回收原头结点
p.next = null;
failed = false;
//获得锁资源 不需要中断线程
return interrupted;
}
//判断是否可以休眠
if (shouldParkAfterFailedAcquire(p, node) &&
//线程等待 并校验是否中断
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued中是获取锁资源的核心方法,方法内部会发生自旋,直到头结点获取锁资源成功。判断逻辑为,需要获取锁的结点是头结点,那么将该头结点对应的线程获取锁资源,并将原来的头结点回收掉,自旋结束。如果不是头结点,则需要判断是否需要挂起线程。
shouldParkAfterFailedAcquire源码如下:
private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) {//前驱结点的结点状态
int ws = pred.waitStatus;
//前驱结点状态为SIGNAL状态,可以休眠
if (ws == Node.SIGNAL)
return true;
//前驱结点为被取消状态
if (ws > 0) {
do {
//依次过滤掉node前驱结点状态为无效状态的结点
node.prev = (pred = pred.prev);
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前驱结点修改为SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//不需要休眠
return false;
}
线程二执行该方法时,第一次自旋,将线程一前驱结点的状态修改为-1-SIGNAL,方法返回为false。第二次自旋,线程一的前驱结点为SIGNAL,所以此时方法返回为true。
接下来执行parkAndCheckInterrupt方法,代码如下:
private final boolean parkAndCheckInterrupt() {
//让线程进入等待状态
LockSupport.park(this);
//线程中断
return Thread.interrupted();
}
线程二调用acquireQueued方法画图所示如下:
后续的线程三、线程四调用acquireQueued方法也类似,将线程三的前结点这个场景是线程二的waitStatus修改为-1,并且挂起线程三,等待其他线程来唤醒。
综上所述,获取锁的核心方法就是这样,线程一获取锁时,会将state从0修改+1,线程二、线程三尝试获取锁时,就会被挂起,结点中的状态会由0更新为-1,进入aqs等待队列中,等待被其他线程唤醒。在入队列时还会顺便清理已经被取消的线程。这里要注意几个自旋的地方,待会线程一锁释放的时候会有联系。
· 锁释放
线程一锁释放的过程,结合上文的分析,理论情况下应该是此时锁的state会被更新为0,头结点会被更新为线程二,经过上面获取锁方法中的自旋,来真正的获取线程二的锁。那么实际情况会是这样的吗,结合锁释放的代码来进行分析。
public final boolean release(int arg) {
//尝试直接释放锁资源 具体可以看ReentraantLock释放锁资源的实现
if (tryRelease(arg)) {
Node h = head;
//头结点不为空 并且头结点状态不为0
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先尝试释放锁资源,这里会调用ReentrantLock中的tryRelease方法,过程与加锁相反,修改state-1,如果state=0,则清空拥有锁资源的线程。上文有ReentrantLock的具体源码。
这里在头结点不为空并且头结点的waitStatus不为0(一般都为-1)的情况下,调用unparkSuccessor-释放锁的核心方法。具体源码如下:
private void unparkSuccessor(Node node) {
//被释放结点的状态
int ws = node.waitStatus;
if (ws < 0)
//清除结点状态,变为初始状态
compareAndSetWaitStatus(node, ws, 0);
//释放结点的下一个结点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//如果s是无效的
s = null;
//从尾结点向前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
//筛选状态值<0的,即有效的结点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒第一个有效结点对应的线程
LockSupport.unpark(s.thread);
}
这块代码逻辑主要是释放头结点,唤醒原头结点的下一个结点,即线程二结点。
这里需要注意在acquireQueued方法中,存在自旋,即当发现头结点为线程二时,会尝试获取锁,这个时候,因为锁资源没有被其他线程所占用,因此线程二也可以获取到锁。如下图所示:
最后,这里有个细节补充,
LockSupport.unpark(s.thread);
这个方法可以唤醒被中断的线程,只有线程被唤醒后才会继续发生自旋。在中断情况下,不会发生自旋,不然假设极端下有1w多个线程中断阻塞着等待着被唤醒,那么就有1w个自旋操作了,那是不可能的。
三、AQS总结
总结下AQS,jUC中的一些锁是基于AQS的,他们基本的操作是维护了一个volatile的state作为锁资源被占用的变量,线程获取锁时会使用CAS来对state进行+1操作,解锁时会对state进行减一操作,如果state为0说明锁资源被完全释放。
在AQS中,会有一个基于双向链表的先进先出的阻塞队列,未获得锁的线程就会进入到该队列中进行响应中断的阻塞等待,直到前面的线程释放锁资源,被唤醒。加锁的核心方法为addWaiter将线程转换为Node构造队列,acquireQueued会自旋,直到该结点作为头结点了才会尝试获取锁。解锁的核心方法为unparkSuccessor作用为释放原来的头结点,唤醒新的头结点线程,唤醒后进入自旋,获取锁资源。
AQS是学习多线程锁知识必须要掌握的原理,个人觉得要掌握AQS还是非常有难度的,学习这个的时候一定要耐心,多画图。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 解决Kotlin 类在实现多个接口,覆写多个接口中相同方法冲突的问题
- Kotlin 匿名类实现接口和抽象类的区别详解
- android实现微信朋友圈发布动态功能
- 基于Android studio3.6的JNI教程之helloworld思路详解
- 基于Android studio3.6的JNI教程之opencv实例详解
- AndroidStudio代码达到指定字符长度时自动换行实例
- android studio 新建项目报错的解决之路
- Android Studio 3.6中使用视图绑定替代 findViewById的方法
- Android 使用View Binding的方法详解
- CentOS一键安装Resilio Sync脚本
- Python从URL获取图片、读取图片格式并保存到本地
- Linux一键屏蔽指定国家所有的IP访问
- React基础(9)-React中发送Ajax请求以及Mock数据
- 一个快速引入CDN的workflow
- 在Ryzen平台上安装macOS High Sierra苹果系统