理解Java并发工具类Phaser

时间:2022-06-11
本文章向大家介绍理解Java并发工具类Phaser,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Phaser(移相器,一种电子元件)是JDK7中引入的新的并发工具辅助类,oralce官网文档描述Phaser是一个可重复使用的同步栅栏,功能上与 CountDownLatch 和 CyclicBarrier类似但支持的场景更加灵活,这个类可能是目前并发包里面实现最复杂的一个了。

Phaser的灵活性主要体现在在构造函数时不需要强制指定目前有多少参与协作的线程,可以在运行时动态改变。

下面看一下关于Phaser常见的方法;

Phaser() //默认的构造方法,初始化注册的线程数量为0
Phaser(int parties)//一个指定线程数量的构造方法

此外Phaser还支持Tiering类型具有父子关系的构造方法,主要是为了减少在注册者数量庞大的时候,通过分组的形式复用Phaser从而减少竞争,提高吞吐,这种形式一般不常见,所以这里不再提及,有兴趣的可以参考官网文档。

其他几个常见方法:

register()//添加一个新的注册者
bulkRegister(int parties)//添加指定数量的多个注册者
arrive()// 到达栅栏点直接执行,无须等待其他的线程
arriveAndAwaitAdvance()//到达栅栏点,必须等待其他所有注册者到达
arriveAndDeregister()//到达栅栏点,注销自己无须等待其他的注册者到达
onAdvance(int phase, int registeredParties)//多个线程达到注册点之后,会调用该方法。

(1)下面我们先看一个简单的替代CountDownLatch实现一次性的共享锁例子

void runTasks(List<Runnable> tasks) {
   final Phaser phaser = new Phaser(1); // "1" to register self
   // create and start threads
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         phaser.arriveAndAwaitAdvance(); // await all creation
         task.run();
       }
     }.start();
   }

   // allow threads to start and deregister self
   phaser.arriveAndDeregister();
 }

这个方法中,首先调用者线程注册了自己,然后接着分别注册并启动了多个线程,在每个线程中又调用了

phaser.arriveAndAwaitAdvance()

方法,这个方法会阻塞直到所有的线程都启动,然后继续执行,最后在方法的最后一行调用了到达时注销自己的方法,执行所有线程到达栅栏点,然后开始执行后续的task.run方法。

(2)接着我们在看一个模拟CyclicBarrier的例子。

这个例子我们以实际场景作为说明,假设小张,小李,小王,三个人约好共同去旅游,旅游路线是北京,上海,杭州,规则是他们都可以采用自己的路线去到达目的地,但是必须是所有人都到达某一个城市集合后,他们才能再次出发下一个城市。

这其实就是一个典型的循环栅栏的例子,我们直接来看如何使用Phaser来完成:

package concurrent.tools.phaser;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 * Created by Administrator on 2018/8/27.
 */
public class PhaserDemo5 {

    public static void main(String[] args) throws InterruptedException {

        Phaser phaser=new Phaser(){
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("=================step-"+phase+"==================="+registeredParties);
                return super.onAdvance(phase, registeredParties);
            }
        };

        Bus bus1=new Bus(phaser,"小张");
        Bus bus2=new Bus(phaser,"小李");
        Bus bus3=new Bus(phaser,"小王");

        bus1.start();
        bus2.start();
        bus3.start();


        System.out.println(phaser.getRegisteredParties());



    }


    static public class Bus extends Thread{

        private Phaser phaser;
        private Random random;

        public Bus(Phaser phaser,String name){
            this.phaser=phaser;
            setName(name);
            random=new Random();
            phaser.register();
        }


        private void trip(int sleepRange,String cityName){
            System.out.println(this.getName()+" 准备去"+cityName+"....");
            int sleep=random.nextInt(sleepRange);
            try {
                TimeUnit.SECONDS.sleep(sleep);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(this.getName()+" 达到"+cityName+"...... ");
            if(this.getName().equals("小王1")){ //  测试掉队的情况
                try {
                    TimeUnit.SECONDS.sleep(7);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                phaser.arriveAndDeregister();
            }else {
                phaser.arriveAndAwaitAdvance();
            }
        }





        @Override
        public void run() {

            try {
                int s=random.nextInt(3);
                TimeUnit.SECONDS.sleep(s);
                System.out.println(this.getName()+"  准备好了,旅行路线=北京=>上海=>杭州 ");
                phaser.arriveAndAwaitAdvance();// 等待所有的汽车准备好
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


            trip(5,"北京");
            trip(5,"上海");
            trip(3,"杭州");

        }
    }

}

输出结果如下:

小王  准备好了,旅行路线=北京=>上海=>杭州 
小李  准备好了,旅行路线=北京=>上海=>杭州 
小张  准备好了,旅行路线=北京=>上海=>杭州 
=================step-0===================3
小王 准备去北京....
小张 准备去北京....
小李 准备去北京....
小张 达到北京...... 
小李 达到北京...... 
小王 达到北京...... 
=================step-1===================3
小王 准备去上海....
小张 准备去上海....
小李 准备去上海....
小李 达到上海...... 
小王 达到上海...... 
小张 达到上海...... 
=================step-2===================3
小张 准备去杭州....
小张 达到杭州...... 
小李 准备去杭州....
小王 准备去杭州....
小王 达到杭州...... 
小李 达到杭州...... 
=================step-3===================3

结果符合预期,在这例中Phaser的构造函数我们并没有指定数量,而是在运行时动态注册就去的,然后里面又使用了onAdvance方法,可以在每次到达栅栏点时输出当前的step阶段序号,这个值最大是Integer.MAX_VALUE,超过之后会重新从0开始。

此外Phaser还提供了一个arriveAndDeregister方法,如果中间小张在到达某个城市之后下起了暴雨行程无期,默认情况下其他的线程都需要等待小张,如果调用了arriveAndDeregister方法,可以在任务列表注销自己,然后自己单独运行,这样不会影响其他的正常运行线程。

本文主要了介绍了JDK7引入的并发工具类Phaser,这个类的功能与CountDownLatch 和 CyclicBarrier类似但更灵活,这个类底层相对比较复杂并没有采用AQS同步框架实现,而是单独定义了相关功能api,其中state采用64位的long类型表示,然后64bit又分成4个定义分别代表没有到达栅栏的数量(0-15bit),注册的数量(16-31bit),栅栏的代数量(32-62bit),最后一位(63bit)代表当前的Phaser是否是终止状态,这也意味着我们能够注册的最大数量不能超过65535,否则会抛出不合法参数异常,这一点在使用时需要注意。