ReentrantLock加锁与释放过程

时间:2022-07-24
本文章向大家介绍ReentrantLock加锁与释放过程,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

背景

通过lockunlock研究ReentrantLock相关实现的原理 :

  • 通过state变量标志当前资源占用
  • ExclusiveThread标志当前占用CPU的线程
  • LockSupport.park/unpark来申请和释放CPU资源
  • Unsafe.compareAndSwapObject来完成CAS操作
  • Unsafe.putObject来完成原子操作

重要变量

  • state变量
    • 为0 : 代表该节点为初创节点 , 什么都没干
    • 大于0 : 即代表CANCELLED , 也就是所有大于0的节点都是被取消的节点
    • 小于0 : SIGNAL = -1 : 代表该节点等待unpark CONDITION = -2 : 代表该节点等待condition的触发 PROPAGATE = -3 : 主要用于共享锁 , 代表该节点需要无条件传递上一个节点

lock流程简述

  1. 通过CAS设置state(0-->1) , 如果设置成功 , 则独占该锁
  2. 设置失败 , 通过addWaiter创建本线程的Node , 插入队列中
  3. 在添加完节点后 , 如果当前线程未抢占成功 , 则会遍历删除Cancel节点
  4. 最后会找到可以执行的Node , 开始通过park来让出当前线程的CPU
  5. 公平锁会在申请锁的时候判断队列后继节点是否存在 , 如果当前节点不是该后继节点的话 , 就不会申请锁.

unlock流程简述

  1. 尝试释放相应的资源
  2. 释放成功的话 , 则会把独占线程让出
  3. 会遍历Node队列 , 从头节点开始寻找可以unpark的线程

非公平锁的加锁与释放

lock的实现

  1. 当调用到ReentrantLock.lock()
    // 以非公平锁为例
    static final class NonfairSync extends Sync {
    
        // 获取锁
        final void lock() {
            // 通过CAS操作设置state的状态
            if (compareAndSetState(0, 1))
                 // 如果设置成功的话 , 则代表当前独占锁的线程为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 尝试获取锁
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            // 重写子类的Acquire , 调用nonfairTryAcquire
            return nonfairTryAcquire(acquires);
        }
    }
  1. acquire中 , 会获取锁 , 或者插入等待队列中
public final void acquire(int arg) {
        //  tryAcquire尝试获取锁 , 如果获取失败的话 , 
        // 则通过acquireQueued将当前线程添加到队列中
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 如果成功的话 , 则中断当前线程 , 仅仅只是中断
            selfInterrupt();
    }
  1. 在Sync重写的tryAcquire中 , 会尝试获取锁
  • 如果当前锁没有被占用(即state=0)时 , 则独占该锁
  • 如果当前独占线程是本线程的话 , 则将State递增
  • 否则 , 则返回false , 将本线程插入队列中
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 获取当前State状态
            int c = getState();
            if (c == 0) {
                // 如果当前State为0的话 , 则独占该锁 ,并且返回true,不需要插入队列
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                //  如果当前线程就是独占的线程的话 , 则将State加一 , 并且返回true
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
  1. 通过addWaiter(Node.EXCLUSIVE)创建Node , 由于是通过Unsafe.putObject , 所以也是保证线程间的可见性的.
    private Node addWaiter(Node mode) {
        // 通过构造函数 , 将Node.EXCLUSIVE作为nextWaiter放入该节点
        Node node = new Node(mode);
        for (;;) {
            // 找到尾节点 , head/tail都是volatile的
            Node oldTail = tail;
            // 尾节点不为空 , 双线链表增加尾节点操作
            if (oldTail != null) {
                // 将tail节点放到node.prev中
                U.putObject(node, Node.PREV, oldTail);
                // 通过cas操作 , 设置tail
                if (compareAndSetTail(oldTail, node)) {
                    // 如果设置成功的话 , 则将tail.next设置成该节点
                    oldTail.next = node;
                    // 返回node
                    return node;
                }
            } else {
                // 如果没有头尾节点 , 则创建head/tail节点
                initializeSyncQueue();
            }
        }
    }
  1. 在添加完节点后 , 开始在队列中找寻要唤醒的线程节点.
  • 如果shouldParkAfterFailedAcquire返回true , 则会Park当前线程
  • 如果shouldParkAfterFailedAcquire返回false , 就会一直自旋遍历前驱节点 , 直到前驱接节点为空 , 则将当前节点设置为head节点 , 可以获取当前锁
final boolean acquireQueued(final Node node, int arg) {
        //  其实node就是尾节点
        try {
            boolean interrupted = false;
            for (;;) {
                // 从尾节点往前遍历 , 获取其前驱
                final Node p = node.predecessor();
                // 如果遍历到了head节点 , 并且获取成功了, 则返回false
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                // shouldParkAfterFailedAcquire:判断在acquire后,是否应该park让出CPU
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }
  1. shouldParkAfterFailedAcquire主要根据前驱节点状态判断当前节点是否需要Park让出CPU
  • 如果前驱节点的ws为Node.SIGNAL的话 , 则返回true , park当前线程
  • 如果前驱节点的ws为canceld的话 , 则从队列中去除掉canceld节点
  • 如果前驱节点的ws为0或者PROPAGATE的话 , 则将前驱节点的ws设置成SIGNAL
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //  如果前驱节点的waitStatus为SIGNAL,就代表前驱节点可以被唤醒
            //  返回true , park当前线程
            return true;
        if (ws > 0) {
            //  如果ws>0, 则代表前驱节点cancelled了 ,  则过滤掉canceld节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //  删除掉cancedld节点
            pred.next = node;
        } else {
            // 如果ws为0或者PROPAGATE的话 , 则将前驱节点的waitStatus设置成SIGNAL , 但是先不park
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }
  1. 如果前驱节点要唤醒的话 , 则会Park当前线程
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

unlock的实现

当某个线程在使用完锁后 , 使用unlock来释放锁资源.

  1. 调用unlock也就是释放1的资源
public void unlock() {
        sync.release(1);
    }
  1. 在AQS中的release会先尝试释放锁 , 如果成功 ,则unpark前驱线程
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  1. Sync中的tryRelease中 , 会释放相关资源 , 并且返回锁是否释放成功
protected final boolean tryRelease(int releases) {
            // 将当前申请资源数 - 要释放的数量
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                // 如果当前线程不是独占线程的话 , 则抛出异常
                throw new IllegalMonitorStateException();
            // 是否释放锁
            boolean free = false;
            if (c == 0) {
                // 如果当前没有资源占用的话 , 则认为可以释放锁
                free = true;
                // 将独占线程设为空
                setExclusiveOwnerThread(null);
            }
            // 设置当前锁所对应的资源数量
            setState(c);  
            //  返回当前锁是否会被释放
            return free;
  1. 当锁成功释放后 , 会判断前驱节点的waitStatus是否不为0 , 由于lock阶段会清除所有的Cancel节点 , 所以此处判断不为0 , 则代表前驱节点可以获取资源占有锁. 于是 , 调用unparkSuccessor让前驱节点获取节点
private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        // 如果waitStatus小于0 , 
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);

        // 开始寻找可执行的前驱节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            // unpark对应的线程
            LockSupport.unpark(s.thread);
    }

公平锁的加锁与释放

对于公平锁(FairSync)而言在加锁的过程中会有所不同 , 仅仅只是在申请锁的时候 , 加入了队列的判断 , 如果头节点有后继节点的话 , 则让后继节点获取CPU

  1. 在申请锁的时候 , 会判断队列是否有后继节点 , 如果有后继节点的话 , 则让后继节点优先获得CPU
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 判断队列中是否有后继节点
                if (!hasQueuedPredecessors() &&
                    // 如果没有前驱的话 , 则会通过cas来设置state
                    compareAndSetState(0, acquires)) {
                    // 设置当前线程独占该锁
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
  1. 判断head/tail节点是否相同 , 如果不同的话 , 代表head节点仍有后继节点
public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
         // 返回头尾节点不相等 , 并且头节点的next不为空 , 头节点的线程不是当前线程
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }