浅谈AQS

时间:2022-07-23
本文章向大家介绍浅谈AQS,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

AQS(AbstractQueuedSynchronizer)抽象的队列同步器,其是一个用于构建锁和同步器的框架,由AQS构造出来的有ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock等。

AQS的框架图如下:

对于共享变量state,其的作用由AQS的子类来决定,例如ReentrantLock中,就用其表示当前线程重入该锁的次数。对于state的访问有以下两种方式:

独占式访问:顾名思义只有单独的线程执行访问,如我们下面要介绍的ReentrantLock

共享式子访问:多个线程可以同时访问

对于双端队列的结点其源码如下:

static final class Node {
        // SHARED 标识共享 EXCLUSIVE 标识独占
        static final Node SHARED = new Node(); 
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1; // 该线程获取锁的请求已经取消
        static final int SIGNAL    = -1; // 当前线程已经准备就绪了,就等锁的释放
        static final int CONDITION = -2; // 表示结点在等待队列中,等待被唤醒
        static final int PROPAGATE = -3;
        // waitStatus可以取 CANCELLED, SIGNAL CONDITION PROPAGATE
        volatile int waitStatus;

        volatile Node prev; 
        volatile Node next;
        volatile Thread thread;
        // 
        Node nextWaiter;
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

此外AQS类中存储head结点和tail结点。

由于ReentrantLock底层使用的是AQS,本文将以ReentrantLock的加锁解锁为切入点学习AQS。

ReentrantLock中加锁解锁都用的是其内部的Sync对象,其继承类图如下:

该类图中并未列举全部方法,只列举用到的主要方法。

首先我们需要注意的是AQS中存在的tryAcquire方法和tryRelease方法体中并没有具体实现,只抛出了两段异常

      protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

模板设计模式

这一块使用的是模板设计模式,模板方法定义了一系列的操作步骤(总体架构),允许子类实现部分步骤。模板设计模式在子类不改变核心算法的基础上,实现了某些步骤。 例如网上经常用来举例的泡茶,泡咖啡的例子:

泡咖啡:将水煮沸,把咖啡扔进去,把糖扔进去,把咖啡倒入杯子
泡茶 :将水煮沸,把茶扔进去,把柠檬进去,把泡好的茶倒入杯子

对于模板方法而言我只需定义好框架 将水煮沸, 将( ) 扔进入, 将 ( )扔进入, 将 ( )倒入杯子。

其子类可根据自己的需要实现部分步骤即可。

该设计模式需要注意的是,应保证整体骨架不被破坏,模板方法一般都是final 修饰的。

对于Syn继承了AQS,NonfairSync和FairSync又继承了Syn,因此我们在NonfairSync和FairSync中可以发现他们对tryAcquire()及tryRelease()的覆写。

下面以ReentrantLock为例分析AQS的加锁解锁过程

ReentrantLock加锁过程

AQS中的acquire()代码如下:大体逻辑为首先其会尝试获取锁,若获取到了退出该函数,若获取不到则添加入队列,在队列中排队获取锁。

     public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
// in AQS

由上面泳道我们可以发现tryAcquire(1)执行的是Sync的模板方法 tryAcquire(1),其内部又执行的是NofairSync中的nonfairTryAcquire(1).以一种非公平的方式获取锁,其代码如下:

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // c = 0 表示当前锁无人持有
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
// in NonfairSync

其大体逻辑如下:首先判断当前锁的状态,若无人持有(c == 0),则以CAS的方式将state置为1,并持有这把锁。若发现锁已经被别人持有可,则判断持有该锁 的线程是不是他自己,若是则将该state+1,重入这把锁。否则表示无法获得返回false。该部分已经体现出了其非公平锁的特征,对于当队头线程刚释放锁的,而排在其后面的线程还未拿到锁的情况,当前线程将会获得这把锁,体现着不公平锁的特性。若是公平锁的话其会先判断队列中有没有等待的,若没等待的其才能获取锁

该部分已经体现出了其非公平锁的特征,对于当队头线程刚释放锁的,而排在其后面的线程还未拿到锁的情况,当前线程将会获得这把锁,体现着不公平锁的特性。若是公平锁的话其会先判断队列中有没有等待的,若没等待的其才能获取锁,若存在等待者其会插到队列后面排队获取锁。

对于不能获取锁的情况才会执行后面的代码,将当前结点插入链表的尾部,再尝试以队列的方式获取锁

addWaiter()就是完成插入到尾部的任务

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(mode);

        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {
                initializeSyncQueue();
            }
        }
    }
// in AQS

先为当前线程创建结点(mode Node.EXCLUSIVE指的是独占模式)。

插入过程中首先获取当前链表的尾巴结点,若其不为null(即链表已经经过初始化了),首先将当前结点的前驱指向尾巴结点,然后采用CAS的方式将尾巴结点置为当前结点,并将之前尾巴结点的后继指向当前结点。

这一块的死循环和CAS操作特别精髓,首先使用CAS操作只对尾巴结点“加锁”(自旋),并没有对整个链表加锁,如此极大的降低并发竞争,此外还解决多个线程并发插入的问题,假设此时多个线程都想插入到队列中,不妨设线程A和线程B吧,线程B在线程A进行CAS之前已经将自家插入到队尾了,线程A进行CAS操作时发现当前的尾巴结点并不是其之前获得的尾巴结点,compareAndSetTail(oldTail, node)操作返回false,重新进入循环体,重新获取尾巴结点,此时获取的尾巴结点时B线程,然后通过了CAS将其插入到队尾,最后返回当前结点。

之后使用得到的node以队列的方式获取锁,其代码如下:

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }
// in AQS

首先判断当前位置是不是队列中排队首的线程(由于头结点不存元素,头结点的下一个元素即为当前排在队头的元素),只有其是队头线程才进行尝试获取锁的操作,若能获取到,则将队头指针后移,并把当前队头元素置为null(由于队头不存元素)。若获得不了则把自己阻塞住,等其前一个线程(执行完释放锁后)叫醒他。叫醒之后继续重复上述步骤直到获取到锁。

ReentrantLock解锁过程

其解锁泳道图如下:

AQS中的release(),其中解锁流程大体为首先调用sync中的tryRelease(1)尝试解锁,若解锁成功(即可以放弃这把锁),则判断当前队列中是否存在其他线程,若存在则唤醒其之后的那个线程。

判断条件为中的h.waitStatus != 0是为了判断后面的线程是否需要被唤醒,h.waitStatus == 0,只有在结点刚创建时该值才为0,此时后继结点正在运行中,不需要被唤醒。只有h != null && waitStatus < 0时才表明后继被阻塞需要被唤醒。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
// in AQS

Sync中尝试释放锁的代码如下:其首先获得该锁的state - 1,然后判断该值是否为0,若为0则证明当前线程对该锁是拿了一次,此时将锁能否释放标志置为yrue,并将锁的拥有者置为null;若该值不为0则证明当前线程对该锁重入多次,还不能释放锁,只能修改其state。

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
// in NonfairSync