理解AbstractQueuedSynchronizer提供的独占锁和共享锁语义

时间:2022-06-11
本文章向大家介绍理解AbstractQueuedSynchronizer提供的独占锁和共享锁语义,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

前言

Doug Lea前辈在JDK5中编写的AbstractQueuedSynchronizer抽象同步框架非常精辟,整个代码里没有使用像synchronized这样调用底层硬件系统层面的锁指令来实现同步状态管理,完全是使用Java语言层面功能配合上轻量级的CAS自旋锁来构建的抽象同步器,总的来说AQS里面包含了二套api语义一种是独占锁,另一种是共享锁。这两套语义都是独立的,并不是说任何时候我们都需要同时使用这两种功能的。关于AQS的学习不建议一上去就关注AQS类源码本身,因为单看源码看不出来有任何精妙,反而容易让人迷惑,但是我们从其构建的工具类反看其如何使用AQS功能,结合具体案例则更容易理解。

AQS独占锁的申请和释放流程

这里以重入锁ReentrantLock独占加锁过程:

(1)reentrantLock.lock()

(2)sync.lock()

(3)acquire(1)

(4)!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

如果tryAcquire加锁成功,把state从0变成1,如果是重入则还会把state累加表示重入的次数,最终就会返回true,如果失败,先调用

addWaiter(Node.EXCLUSIVE)方法,这个方法的作用是:

将返回false的线程加入到AQS的阻塞队列里面,这里面先初始化一个Node节点,把当前线线程和锁模式(这里是独占)初始化进去,然后会判断tail末尾节点是不是null,如果不等于null,则直接将当前节点加入队列,并把其pred引用指向上一个末尾节点,同时把上一个末尾节点的next引用指向当前最后的节点,这是是双向链表结构。如果是null则进入初始化方法enq,这个方法的作用:

采用无限循环+CAS的模式,先初始化一个空的head节点,并把tail节点置为和head节点相等,这里使用的是原子字段更新方法AtomicReferenceFieldUpdater来赋值的,成功之后把node节点的prev指向刚才初始化的tail节点,然后把当前的node节点也通过原子引用更新器的方法赋值到tail节点,直到成功为止,返回当前的node节点,至此循环结束。

接着进入acquireQueued方法,这个方法的主要作用是用来挂起线程通过LockSupport的park方法,首先判断当前的节点是不是第一个锁的节点,里面会再次调用tryAcquire方法确认,如果是则覆盖原来的空的head节点,让自己变成真正意义上的head节点,然后返回,这样这个节点就可以继续执行其任务了,但如果这里判断失败则意味着自己并不是当前的队列的第一个线程,那么就需要判断前继节点的waitStats状态,如果是0(初始化状态)或者时-3(传播),那么则意味着当线的线程需要阻塞等待,所以需要将前继的waitStats状态更新为-1(信号通知),然后当前节点继续循环第二次,发现前继节点的状态是-1,那么就会调用LockSupport类的park方法,将当前线程挂起等待,直到第一个节点的任务处理完毕后,唤醒自己。

(5)至此申请锁完毕,如果得到锁则执行,失败则放入同步队列里面挂起,至于公平和非公平在于允不允许直接抢占锁(修改state)字段,如果允许就是不公平,注意不公平只有一次抢占机会,如果失败还得走排队流程,如果不允许就是公平,来了之后就走流程直接排队就行了。

独占解锁过程:

(1)调用unlock方法

(2)代理类调用sync.release(1);方法

(3)实现类调用tryRelease(1)方法,将state值减去1,如果成功则调用 unparkSuccessor方法,这个方法的作用是设置当前节点的waitStatus状态为0,接着获取next节点,如果next的节点的waitStatus状态被修该成了1,就意味着这个任务等不急已经取消了,那么则设置引用为null,方便gc,然后接着从尾节点向前遍历,找到这个取消节点之后的第一个需要通知唤醒的后继,源码如下:

Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;//注意这里没有break,也就是说它找的一定是取消节点之后的第一个节点
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒线程

至此释放锁成功。

AQS共享锁的申请和释放流程

这里以CountDownLatch的await分析:首先在构造函数里面我们需要传入一个阻塞的线程个数这里假设为3,在构造函数里面会设置AQS的state字段值为3。

(1)申请共享锁sync.acquireSharedInterruptibly(1)

(2)调用tryAcquireShared(arg) < 0判断是否有资格申请:

这个方法需要子类实现

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

很明显这里state=3,所以返回-1,故而获取成功

(3)接着调用doAcquireSharedInterruptibly方法

这个方法里面会先调用addWaiter(Node.SHARED)方法,该方法里面会先判断是不是有末尾节点,如果有直接添加,如果没有则需要初始化链表head,并将head.next指向自己。自己的prev指向head,同时将自己变成tail节点。接着调用 if (shouldParkAfterFailedAcquire(p, node) 方法,将head节点的waitSatus设置为-1,然后自旋一次进入阻塞。同时假如有多个调用await方法的线程,那么这些线程会依次排队等待。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

接着我们看下,如何释放锁?

在CountDownLatch里面释放锁是由线程执行完任务,调用countDown方法实现的,在该方法里面调用了代理类的解锁方法:

public void countDown() {
        sync.releaseShared(1);
    }

接着看这个方法:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

我们看到这里面调用了tryReleaseShared方法:

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

这个方法只有当state的等于0时,才会返回true也就是说只有最后一个线程调用了countDown方法,那么doReleaseShared方法才会被激活,这也是CountDownLatch的功能,接着在doReleaseShared方法里面,会将head节点的status状态设置为0,然后调用unparkSuccessor方法唤醒第一个在休眠的线程。

接着我们回到上面的第三步的方法:这个时候由于state=0,那么该方法就会进入

int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }

这个方法体,其中setHeadAndPropagate方法会将当前的节点替换为head节点,然后如果它的后继节点不为null,就继续调用doReleaseShared()

private void doReleaseShared() {

        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

这个方法里面又会唤醒,该线程的下一个节点,直到h==head代表都已经释放完毕,从而退出循环。

简单的来说共享锁的释放类似,排队的人,第一个告诉第二个你可以执行了,然后第二个完事,告诉第三个依次类推直到所有的共享锁得到释放。

总结

借用Java并发编程的艺术里面术语来说,锁是面向使用者的,而AQS则是面向实现者也或开发者,AQS抽象了锁的状态管理,同步队列的,等待与唤醒等功能,简化了锁的实现方式,从而很好的隔离了使用者和实现者所关注的重点。

参考文章:

https://yq.aliyun.com/articles/601071?spm=a2c4e.11153940.bloghomeflow.642.513b291a06OQbE

http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer

http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer