多线程应用 - 超详细的AQS详情

时间:2022-07-23
本文章向大家介绍多线程应用 - 超详细的AQS详情,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

这篇主要是分析下AQS的原理,说实话挺难懂的。写文章的时候也难以下手。先解释下AQS是什么。

JUC,即java.util.concurrent,这些并发包中公平锁和非公平锁是基于AQS原理实现的。实际上AQS就是一个队列同步器,核心数据结构是双向链表+线程结点状态。底层状态的改变的操作是通过CAS。

一、入手AQS

AQS因为是底层原理,所以我们需要借助JUC下的ReentrantLock来理解。这里提醒下,不仅仅是ReentrantLock类是基于AQS,可以查看AQS的引用发现,CountDownLatch、Semaphore这些也使用了AQS原理。

· AQS结构

public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {

· ReentrantLock结构

public class ReentrantLock implements Lock, java.io.Serializable {

ReentrantLock类实现了Lock接口。因此RentrantLock类也具有Lock接口中定义的这些锁的核心方法。方法如下:

public interface Lock {

    //获取锁 - 可重入锁
    void lock();
    //获取可响应中断锁
    void lockInterruptibly() throws InterruptedException;
    //尝试获取锁,如果获取成功返回true;获取失败返回false
    boolean tryLock();
    //尝试获取锁,如果获取成功返回true;获取失败则会等待一定时间,如果还是获取失败返回false
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    //解锁操作
    void unlock();
    //用作锁的条件 必须通过lock来构建该对象
    Condition newCondition();
}

到这里单看ReentrantLock类,发现和AQS也没有什么关系。继续查看ReentrantLock的内部类结构。

· ReentrantLock构造函数

//无参构造函数 默认锁为非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}

//fail如果为true-公平锁,false-非公平锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

构造函数中,可定义使用公平锁还是非公平锁。

· 公平锁内部类

static final class FairSync extends Sync {
    //获取锁,调用AQS中的acquire 尝试获取锁
    final void lock() {
        acquire(1);
    }

    //获取锁
    protected final boolean tryAcquire(int acquires) {
        //当前线程
        final Thread current = Thread.currentThread();
        //锁的状态 - 也为线程计数器
        int c = getState();
        //没有线程获取锁资源
        if (c == 0) {
            //获取锁资源,aqs内部安全判断(AQS中会讲到),cas修改锁状态
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
            //当前线程已经获取锁资源 - 重入锁操作
        } else if (current == getExclusiveOwnerThread()) {
            //锁计数器+1
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            //更新锁状态
            setState(nextc);
            return true;
        }
        return false;
    }
}

· 非公平锁内部类

//非公平锁
static final class NonfairSync extends Sync {

    //获取锁
    final void lock() {
        //cas修改无锁状态0更新为1
        if (compareAndSetState(0, 1))
            //更新成功 说明之前是没有线程占用锁资源,当前占用锁资源的为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    //获取非公平锁
    protected final boolean tryAcquire(int acquires) {
        //调用父类sync中的非公平锁获取方法,返回结果
        return nonfairTryAcquire(acquires);
    }
}

· Sync类

公平锁和非公平锁类都继承自Sync,而Sync则继承自AQS。

//内部类AQS锁的实现
abstract static class Sync extends AbstractQueuedSynchronizer {

    //获取锁
    abstract void lock();
    //获取非公平锁
    final boolean nonfairTryAcquire(int acquires) {
        //当前线程
        final Thread current = Thread.currentThread();
        //获取当前锁的状态
        int c = getState();
        //锁状态为0 当前锁资源没有被其他线程占用
        if (c == 0) {
            //cas修改锁的状态
            if (compareAndSetState(0, acquires)) {
                //设置当前线程 持有锁资源
                setExclusiveOwnerThread(current);
                return true;
            }

            //如果当前线程已经拥有了这个锁资源
        } else if (current == getExclusiveOwnerThread()) {
            //nextc相当于一个锁的计数器,当前状态+acquires(如果重入到另一个方法,一般是1)
            //即重入锁后 计数器+1
            int nextc = c + acquires;
            //计数器小于0,加锁失败
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            //设置状态
            setState(nextc);
            return true;
        }

        //加锁失败
        return false;
    }

    //解锁操作
    protected final boolean tryRelease(int releases) {
        //计数器-释放锁的个数,线程重入锁中释放一般是1
        //即计数器个数-1
        int c = getState() - releases;
        //当前线程并没有获取锁的话,说明有问题
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        //标记 - 是否线程中所有重入锁解锁成功
        boolean free = false;
        //c为0 表示线程解锁成功
        if (c == 0) {
            free = true;
            //设置没有线程获取锁资源
            setExclusiveOwnerThread(null);
        }
        //设置锁的状态
        setState(c);
        return free;
    }

    //判断当前线程是否有锁资源
    protected final boolean isHeldExclusively() {
        //即上面加锁、解锁操作中设置的exclusiveOwnerThread线程是否为当前线程
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    //aqs中的条件构造
    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    //获取拥有锁资源的线程
    final Thread getOwner() {
        //不为0,说明有线程拥有锁资源,为exclusiveOwnerThread线程
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    //当前线程是否拥有锁资源
    final int getHoldCount() {
        //先判断拥有锁资源的是否为该线程
        //如果是当前线程判断状态,大于0说明有锁资源
        return isHeldExclusively() ? getState() : 0;
    }

    //是否被锁定
    final boolean isLocked() {
        //判断状态不为0
        return getState() != 0;
    }

    //序列化
    private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0);
    }
}

源码读到这里,可以看到ReentrantLock还是比较简单的。类内部本质上其实是维护了一个state变量,当获取锁时,state会+1,当释放锁时,state会-1,如果state为0说明线程释放了锁资源。

· ReentrantLock基本方法

//获取锁
public void lock() {
    sync.lock();
}

//获取响应中断的锁
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

//尝试获取锁,直接返回
public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

//尝试获取锁,如果没获取成功,会阻塞指定时间,超时后若还未获取锁 返回false
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

//解锁操作
public void unlock() {
    sync.release(1);
}

//获取锁的条件
public Condition newCondition() {
    return sync.newCondition();
}

//获取当前线程保持锁定的个数
public int getHoldCount() {
    return sync.getHoldCount();
}

//当前线程是否保持锁定
public boolean isHeldByCurrentThread() {
    return sync.isHeldExclusively();
}

//锁是否被占用着
public boolean isLocked() {
    return sync.isLocked();
}

//锁是公平锁还是非公平锁
public final boolean isFair() {
    return sync instanceof FairSync;
}

//获取拥有该锁资源的线程
protected Thread getOwner() {
    return sync.getOwner();
}

//获取是否有因请求锁资源而阻塞的线程
public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}

//判断线程是否在等待锁队列中
public final boolean hasQueuedThread(Thread thread) {
    return sync.isQueued(thread);
}

//获取等待锁队列的长度
public final int getQueueLength() {
    return sync.getQueueLength();
}

//获取等待锁队列
protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
}

//线程有被阻塞 condition.await状态
public boolean hasWaiters(Condition condition) {
    if (condition == null)
        throw new NullPointerException();
    if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
        throw new IllegalArgumentException("not owner");
    return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject) condition);
}

//获取线程阻塞队列的长度
public int getWaitQueueLength(Condition condition) {
    if (condition == null)
        throw new NullPointerException();
    if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
        throw new IllegalArgumentException("not owner");
    return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject) condition);
}

//获取阻塞队列
protected Collection<Thread> getWaitingThreads(Condition condition) {
    if (condition == null)
        throw new NullPointerException();
    if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
        throw new IllegalArgumentException("not owner");
    return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject) condition);
}

二、AQS源码分析

· AQS内部属性

//AQS无参构造函数,没啥好说的
protected AbstractQueuedSynchronizer() {
}
//AQS头结点
private transient volatile Node head;
//AQS尾结点
private transient volatile Node tail;
//状态
private volatile int state;
//获取状态值
protected final int getState() {
    return state;
}

//设置状态值
protected final void setState(int newState) {
    state = newState;
}

//cas修改状态值
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

从内部属性中可以看到,AQS其实是一个双向链表。查看Node结点的内部类属性:

//AQS内部是一个结点
static final class Node {

    //共享结点
    static final Node SHARED = new Node();
    //独占结点
    static final Node EXCLUSIVE = null;
    //当线程等待超时或者被中断,需要从同步队列中取消等待
    //1-取消
    static final int CANCELLED = 1;
    //后继结点若处于等待状态情况下,如果当前结点取消等待,      通知后继结点,使后继结点得以运行
    //-1-通知
    static final int SIGNAL = -1;
    //结点在等待的情况下,其他线程调用了Condition.signal()方法后      ,将结点从等待队列移到同步队列中
    //-2-等待
    static final int CONDITION = -2;
    //下一次的共享状态会被无条件传播下去
    //-3-传播
    static final int PROPAGATE = -3;
    //结点状态
    volatile int waitStatus;
    //上一个结点 即前驱结点
    volatile Node prev;
    //下一个结点 即后继结点
    volatile Node next;
    //结点对应的同步状态的线程
    volatile Thread thread;
    //下一个等待者结点
    Node nextWaiter;
    //当前阶段是否处于共享状态
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //获取前驱结点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            //前驱结点不能为空
            throw new NullPointerException();
        else
            //获取前驱结点
            return p;
    }

    //结点无参构造
    Node() {
    }

    //addWaiter方法会使用该构造函数
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    //Condition会使用该构造函数
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

Node结点中的状态属性比较难道,先过个眼熟,理解即可,下面分析会使用到这些状态值。

· 加锁操作

//获取锁资源
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
        //线程中断
        selfInterrupt();
    }
}

线程获取锁资源方法,我们发现他在AQS中并没有具体实现,具体的实现会在其他类中,可以看下该方法的引用。

//在AQS中没有具体实现,实现实现在其他类中
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

因为这篇是基于ReentrantLock分析的AQS,所以可以看到在ReetrantLock中做的事情,上文其实已经有源码分析过。公平锁中首先会判断锁的state数值如果为0,表示锁没有被任何线程所占用,可以让线程获取锁资源,通过cas修改state,并设置锁资源的拥有者为当前线程。如果锁的state数值不为0,需要判断获取锁资源的是否为当前线程,如果不是直接返回失败,如果是,表示遇到了可重入情况,将锁的计数器+1。

而在AQS中,会先判断ReentrantLock的tryAcquire方法是否成功,如果不成功,假设第一个线程加锁成功,第二个线程调用获取锁资源方法,则第二个线程会加入到等待队列中。执行代码首先如下:

//构建结点
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    //尾结点
    Node pred = tail;
    //尾结点不为空
    if (pred != null) {
//原来的尾结点作为新插入的结点的前驱结点,          新插入的结点作为原插入结点的后继结点
        node.prev = pred;
        //cas修改尾结点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //尾结点如果为空,则调用enq方法 会自旋插入到尾结点
    enq(node);
    return node;
}

这里可以看到,新来的结点的插入都会被插到尾结点中。如果尾结点为空,需要调用enq方法。 enq代码如下:

private Node enq(final Node node) {
    //自旋
    for (; ; ) {
        //获取尾结点
        Node t = tail;
        //尾结点为空,必须设置尾结点
        if (t == null) {
            //cas设置头结点,并且尾结点为头结点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //当前结点的前驱结点 指向尾结点
            node.prev = t;
            //cas修改尾结点 插入的node结点作为尾结点
            if (compareAndSetTail(t, node)) {
                //原未结点的后继结点指向插入结点
                t.next = node;
                //返回原来的尾结点,这里的返回没什么用
                return t;
            }
        }
    }
}

enq中的目的其实就是为了不让尾结点为空,初始化尾结点,插入都是按照尾插法。迷糊的同学看下图,画图说明:

上图中发生的操作都是在enq方法中,假如并发情况下,有第三个线程争抢获取锁资源,仍会调用addWaiter方法,并且此时尾结点已经不为空。因此将线程结点插入到尾部,并通过CAS修改尾结点。画图说明:

如果后面还有线程需要获取锁,也会进入等待队列中,并且依次插入到尾部。

addWaiter方法执行完后,会继续调用acquireQueued方法,源码如下:

//AQS获取资源核心操作
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        //中断位标记
        boolean interrupted = false;
        //自旋
        for (; ; ) {
            //获取传入结点的前驱结点
            final Node p = node.predecessor();
            //如果是传入结点是头结点,会尝试获取锁资源
            if (p == head && tryAcquire(arg)) {
                //当前结点尝试获取锁资源成功,头结点为当前结点
                setHead(node);
                //帮助回收原头结点
                p.next = null;
                failed = false;
                //获得锁资源 不需要中断线程
                return interrupted;
            }
            //判断是否可以休眠
            if (shouldParkAfterFailedAcquire(p, node) &&
                    //线程等待 并校验是否中断
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued中是获取锁资源的核心方法,方法内部会发生自旋,直到头结点获取锁资源成功。判断逻辑为,需要获取锁的结点是头结点,那么将该头结点对应的线程获取锁资源,并将原来的头结点回收掉,自旋结束。如果不是头结点,则需要判断是否需要挂起线程。

shouldParkAfterFailedAcquire源码如下:

private static boolean shouldParkAfterFailedAcquire                    (Node pred, Node node) {//前驱结点的结点状态
    int ws = pred.waitStatus;
    //前驱结点状态为SIGNAL状态,可以休眠
    if (ws == Node.SIGNAL)
        return true;
    //前驱结点为被取消状态
    if (ws > 0) {
        do {
            //依次过滤掉node前驱结点状态为无效状态的结点
            node.prev = (pred = pred.prev);
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //前驱结点修改为SIGNAL状态
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    //不需要休眠
    return false;
}

线程二执行该方法时,第一次自旋,将线程一前驱结点的状态修改为-1-SIGNAL,方法返回为false。第二次自旋,线程一的前驱结点为SIGNAL,所以此时方法返回为true。

接下来执行parkAndCheckInterrupt方法,代码如下:

private final boolean parkAndCheckInterrupt() {
    //让线程进入等待状态
    LockSupport.park(this);
    //线程中断
    return Thread.interrupted();
}

线程二调用acquireQueued方法画图所示如下:

后续的线程三、线程四调用acquireQueued方法也类似,将线程三的前结点这个场景是线程二的waitStatus修改为-1,并且挂起线程三,等待其他线程来唤醒。

综上所述,获取锁的核心方法就是这样,线程一获取锁时,会将state从0修改+1,线程二、线程三尝试获取锁时,就会被挂起,结点中的状态会由0更新为-1,进入aqs等待队列中,等待被其他线程唤醒。在入队列时还会顺便清理已经被取消的线程。这里要注意几个自旋的地方,待会线程一锁释放的时候会有联系。

· 锁释放

线程一锁释放的过程,结合上文的分析,理论情况下应该是此时锁的state会被更新为0,头结点会被更新为线程二,经过上面获取锁方法中的自旋,来真正的获取线程二的锁。那么实际情况会是这样的吗,结合锁释放的代码来进行分析。

public final boolean release(int arg) {
    //尝试直接释放锁资源 具体可以看ReentraantLock释放锁资源的实现
    if (tryRelease(arg)) {
        Node h = head;
        //头结点不为空 并且头结点状态不为0
        if (h != null && h.waitStatus != 0)

            unparkSuccessor(h);
        return true;
    }
    return false;
}

首先尝试释放锁资源,这里会调用ReentrantLock中的tryRelease方法,过程与加锁相反,修改state-1,如果state=0,则清空拥有锁资源的线程。上文有ReentrantLock的具体源码。

这里在头结点不为空并且头结点的waitStatus不为0(一般都为-1)的情况下,调用unparkSuccessor-释放锁的核心方法。具体源码如下:

private void unparkSuccessor(Node node) {
    //被释放结点的状态
    int ws = node.waitStatus;
    if (ws < 0)
        //清除结点状态,变为初始状态
        compareAndSetWaitStatus(node, ws, 0);
    //释放结点的下一个结点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        //如果s是无效的
        s = null;
        //从尾结点向前遍历
        for (Node t = tail; t != null && t != node; t = t.prev)
            //筛选状态值<0的,即有效的结点
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //唤醒第一个有效结点对应的线程
        LockSupport.unpark(s.thread);
}

这块代码逻辑主要是释放头结点,唤醒原头结点的下一个结点,即线程二结点。

这里需要注意在acquireQueued方法中,存在自旋,即当发现头结点为线程二时,会尝试获取锁,这个时候,因为锁资源没有被其他线程所占用,因此线程二也可以获取到锁。如下图所示:

最后,这里有个细节补充,

LockSupport.unpark(s.thread);

这个方法可以唤醒被中断的线程,只有线程被唤醒后才会继续发生自旋。在中断情况下,不会发生自旋,不然假设极端下有1w多个线程中断阻塞着等待着被唤醒,那么就有1w个自旋操作了,那是不可能的。

三、AQS总结

总结下AQS,jUC中的一些锁是基于AQS的,他们基本的操作是维护了一个volatile的state作为锁资源被占用的变量,线程获取锁时会使用CAS来对state进行+1操作,解锁时会对state进行减一操作,如果state为0说明锁资源被完全释放。

在AQS中,会有一个基于双向链表的先进先出的阻塞队列,未获得锁的线程就会进入到该队列中进行响应中断的阻塞等待,直到前面的线程释放锁资源,被唤醒。加锁的核心方法为addWaiter将线程转换为Node构造队列,acquireQueued会自旋,直到该结点作为头结点了才会尝试获取锁。解锁的核心方法为unparkSuccessor作用为释放原来的头结点,唤醒新的头结点线程,唤醒后进入自旋,获取锁资源。

AQS是学习多线程锁知识必须要掌握的原理,个人觉得要掌握AQS还是非常有难度的,学习这个的时候一定要耐心,多画图。