CyclicBarrier 源码解析

时间:2022-07-23
本文章向大家介绍CyclicBarrier 源码解析,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

和CountDownLatch一样,CyclicBarrier也是猜不出其作用,看来和栅栏有关系,哈哈哈

,那么这个类到底是干什么的?还是一样的操作,百度

举个例子,就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。这里的朋友们就是各个线程,餐厅就是 CyclicBarrier 作者:一团捞面 链接:https://www.jianshu.com/p/333fd8faa56e 来源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

那么既然是工具,那么我们还是做个demo跑一下,看看效果。这里还是用使用上述作者的demo。实名感谢

public class CyclicBarries {

    static class TaskThread extends Thread {

        CyclicBarrier barrier;

        public TaskThread(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(getName() + " 到达栅栏 A");
                barrier.await();
                System.out.println(getName() + " 冲破栅栏 A");

                Thread.sleep(2000);
                System.out.println(getName() + " 到达栅栏 B");
                barrier.await();
                System.out.println(getName() + " 冲破栅栏 B");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {

        int threadNum = 5;

        CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " 完成最后任务");
            }

        });

        for(int i = 0; i < threadNum; i++) {
            new TaskThread(barrier).start();
        }

    }
}

看到了效果了吧,就是所有的线程都把活干完之后,才能突破await的方法,向下运行。

这里我们用到了CyclicBarrier的初始化,和await方法。那么我们详细看看CyclicBarrier具体是如何实现的。

发现CyclicBarrier类的方法也不是很多。还算比较友好。那么我们还是按照初始化方法的顺序来看看源码吧!

其中parties就是线程的数量,barrierAction就是回调函数。可以看出实例化就做了这些事情。如果回调函数为空,那么就直接将回调函数置为null。

由于CyclicBarrier的主要方法是await,那么我们主要分析一下await的实现过程。

 /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException
               //拿到重入锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
          //每次调用await方法,数量就减一
            int index = --count;
            if (index == 0) {  // tripped
            //当index为0的时候,表示已经没有什么障碍了,可以突破了
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //判断时候设置了回调函数,有得话运行一下
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //设置下一个开闸标志
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                    //图片限制,激活线程
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                    //让自己先阻塞
                        trip.await();
                    else if (nanos > 0L)
                    //让自己在超时时间内休眠
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
        //释放锁
            lock.unlock();
        }
    }

到这里CyclicBarrier的主要的实现机制大概已经明确了。就是通过对线程的数量与传入的 parties的对比,当其值为0的时候运行一下回调函数,然后激活所有等待的线程,然后调用breakBarrier突破屏障。其中激活等待线程的方法调用的是AQS的方法。之前应该或多或少提到过,逻辑比较复杂。

总结:

CyclicBarrier的await方法是使用ReentrantLock和Condition控制实现的,使用的Condition实现类是ConditionObject,它里面有一个等待队列和await方法,这个await方法会向队列中加入元素。当调用CyclicBarrier的await方法会间接调用ConditionObject的await方法,当屏障关闭后首先执行指定的barrierAction,然后依次执行等待队列中的任务,有先后顺序 原文链接:https://blog.csdn.net/u012372941/java/article/details/93785920

在网上找了这么一段,算是解答了我的疑问,因为我把所有的任务都扔给了AQS,然后发现CyclicBarrier中的Condition,ReentrantLock都好理解,condition就不好理解,但是我们在breakBarrier的时候里边确实是调用的conditiion.signalAll(),当时我们并没有往底层走。其实这块signalAll就是把AQS队列的元素进行激活(比较复杂)。在调用await的时候这个condition会向AQS中添加节点。相当于把自己跟在队列的后边等待下一次突破。但我总感觉这里了unlock没有执行。。。