多线程应用 - 基于AQS的Condition

时间:2022-07-23
本文章向大家介绍多线程应用 - 基于AQS的Condition,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Condition类其实已经在之前的阻塞队列中有过分析。除了使用Synchronized关键字来作为同步锁外,ReentrantLock也可以代替Synchronized来作为同步锁。另外在Object方法中,可以发现有wait()方法和notify()方法来实现多线程中的等待/通知模式,相对于ReentrantLock,也可以使用Condition类中的await()和signal()方法来实现等待/通知模式。

Condition类底层的实现是基于AQS,所以有必要写一篇来相互学习一下。

一、Condition类

public interface Condition {

    //通用阻塞方法 即 当前线程被唤醒之前或中断之前一直处于阻塞状态
    //之前文中分析的阻塞队列使用较多
    void await() throws InterruptedException;
    //阻塞方法,不可中断,当线程被唤醒之前一直处于阻塞状态 - 用的不多
    void awaitUninterruptibly();
    //阻塞方法,即 当前线程被唤醒、被中断、或阻塞时间未超时情况下 一直处于阻塞状态
    //返回值为阻塞剩余时间
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    //阻塞方法,即 当前线程被唤醒、被中断、或阻塞时间未超时情况下 一直处于阻塞状态
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    //阻塞方法,即 当前线程被唤醒、被中断、或阻塞时间未到达指定期限情况下 一直处于阻塞状态
    boolean awaitUntil(Date deadline) throws InterruptedException;
    //唤醒一个等待线程 - 与阻塞方法相对应
    void signal();
    //唤醒所有等待线程 - 与阻塞方法相对应
    void signalAll();
}

可以看到Condition其实是一个接口,并且方法中提供了阻塞、指定超时时间的阻塞、唤醒队列等方法。

查看这些方法的引用,可以看到这些方法的实现是在AQS中的。

二、Condition示例

public static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) {

    new AwaitTest().start();
    new SignalTest().start();
}


static class AwaitTest extends Thread {
    public void run() {
        System.out.println("awaitTest=====线程开始");
        lock.lock();
        try {
            System.out.println("awaitTest线程调用await方法之前");
            condition.await();
            System.out.println("awaitTest线程调用await方法之后");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

static class SignalTest extends Thread {
    public void run() {
        System.out.println("signalTest=====线程开始");
        lock.lock();
        try {
            System.out.println("signalTest线程调用signal方法之前");
            condition.signal();
            System.out.println("signalTest线程调用signal方法之后");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

执行结果:

awaitTest=====线程开始

awaitTest线程调用await方法之前

signalTest=====线程开始

signalTest线程调用signal方法之前

signalTest线程调用signal方法之后

awaitTest线程调用await方法之后

上述结果发现,调用await的线程会让出锁资源让其他线程来执行,其他线程调用signal方法并执行完后,线程一才会恢复,继续执行。

三、基于AQS的Condition类

前面发布的文章中,分析了基于AQS实现的ReentrantLock,这一篇在前文AQS的基础上来分析AQS中await和signal的核心方法的实现。

· Condition类

在AQS中存在内部类,ConditionObject实现Condition,因此这也是为什么Condition需要和ReentrantLock类绑定的原因。

public class ConditionObject implements Condition, java.io.Serializable {

· Condition类内部变量

//队列中第一个结点
private transient Node firstWaiter;
//队列中最后一个结点
private transient Node lastWaiter;
//无参构造函数
public ConditionObject()
//标识-发生了中断但是不需要破抛出异常
private static final int REINTERRUPT = 1;
//标识-发生了中断 需要抛出异常
private static final int THROW_IE = -1;

可以看到Condition中也维护了一个自己的队列,用来维护线程状态。

· Condition类的await()

//await方法
public final void await() throws InterruptedException {
    //当前线程已经被中断了,抛出异常 不允许阻塞
    if (Thread.interrupted())
        throw new InterruptedException();
    //该方法的作用是将当前线程加入到阻塞的Condition队列中
    Node node = addConditionWaiter();
    //这这里因为线程调用了await方法,所以会释放锁资源
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        //挂起当前线程
        LockSupport.park(this);
        //线程恢复后,修改线程一结点状态 -2 —> 0 ,并调用enq方法,放到aqs队列中
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //尝试获取锁资源
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    //移除Condition队列中的失效队列
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        //若当前线程没被挂起,中断当前线程
        reportInterruptAfterWait(interruptMode);
}

基于上述线程一和线程二的例子举例。线程一获取锁资源后线程二暂时无法获得锁资源,所以会进入aqs等待队列中等待锁资源(AQS原理分析过)。

然后线程一调用了await方法,先将自己放入到Condition等待队列尾部,并释放线程一锁资源,此时线程一会被挂起。直到被线程二调用signal()方法唤醒。

唤醒后操作:线程一结点状态由-2变为0,并通过enq方法加入到aqs锁等待队列中。并且尝试获取锁资源。此时的线程一状态已经变为0,其他线程进入Condition队列后线程一会被移除Condition队列中。

加入Condition队列中的方法,尾插法:

private Node addConditionWaiter() {
    //获取最后一个结点
    Node t = lastWaiter;
    //这里有个清楚结点操作,如果队尾结点不是处于Condition-等待状态
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        //获取真正的最后一个状态为-2的结点
        t = lastWaiter;
    }
    //将当前线程构造为结点,同时指定状态为-2
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //当前队列中无结点,那么该线程为头结点
    if (t == null)
        firstWaiter = node;
    else
        //否则插入到尾部 也是尾插法
        t.nextWaiter = node;
    //指定当前结点为尾结点
    lastWaiter = node;
    return node;
}

释放线程一锁资源方法:

//释放当前结点锁资源
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //获取当前线程锁资源state计数器
        int savedState = getState();
        //释放锁资源,唤醒下一个线程
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

挂起线程后的方法:

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
}
final boolean transferAfterCancelledWait(Node node) {
    //修改状态为0,加入到aqs阻塞队列中
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

线程一调用await()方法后,线程一先会被插入到Condition队列,挂起等待。

被唤醒后会尝试获取锁资源,因此会被放入aqs锁等待队列。

· Condition类的signal()

//唤醒锁资源方法
public final void signal() {
    //这个判断是因为 线程调用signal是建立在线程有锁资源的情况
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //等待队列中的第一个结点
    Node first = firstWaiter;
    if (first != null)
        //唤醒方法
        doSignal(first);
}

上图为线程二调用signal方法后唤醒线程一的方法。此时队列中firstWaiter为线程一。

doSignal方法,将Condition中的线程一从队列中移除:

private void doSignal(Node first) {
    do {
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //将头结点从等待队列中移除
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
}

唤醒线程一结点,核心方法:

//头结点处理 核心方法
final boolean transferForSignal(Node node) {
    //cas更新结点状态为-2更新为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //线程一结点被添加到aqs队列中
    Node p = enq(node);
    int ws = p.waitStatus;
    //修改原尾结点的结点状态为-1 SIGNAL
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //唤醒线程一
        LockSupport.unpark(node.thread);
    return true;
}

signal方法比较好理解,先移除Condition队列中的首节点,更新首节点中线程的waitStatus,唤醒线程。线程唤醒后又会进入await方法原先被中断的方法,继续向下执行,获取线程一的锁资源,执行线程一方法。

四、Condition注意点

总体来说,有了上一篇AQS的基础,理解Condition并不难。在Condition中新认识了-2Condition,和-1Signal两种状态,-2Condition即进入Condition队列的等待状态-1Signal即被唤醒状态。

Condition类结构中可以看出,他是基于AQS实现的,因此需要与ReentrantLock相绑定,调用await方法时会释放锁资源并将阻塞线程加入到Condition队列中等待直到被其他线程唤醒。其他线程可通过Signal方法唤醒调用await方法而阻塞中断的线程,之前被中断的线程会从Condition队列中被移除,然后加入到aqs等待队列中尝试获取锁资源,因此其他线程需要完成释放锁资源后aqs等待队列中的线程才可以重新获得锁资源(并不是一旦其他线程调用signal方法就可以重新获得锁资源)。

Condition队列也与AQS队列不同,Condition队列只会按照FIFO顺序,并不像AQS一样具有公平锁和非公平锁来获取锁资源。