跟着实例学习ZooKeeper的用法: Barrier
时间:2022-04-25
本文章向大家介绍跟着实例学习ZooKeeper的用法: Barrier,主要内容包括栅栏Barrier、双栅栏Double Barrier、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。
分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。
比如赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,所有的赛马都飞奔而出。
栅栏Barrier
DistributedBarrier
类实现了栅栏的功能。 它的构造函数如下:
public DistributedBarrier(CuratorFramework client, String barrierPath)Parameters:
client - client
barrierPath - path to use as the barrier
首先你需要设置栅栏,它将阻塞在它上面等待的线程:
setBarrier();
然后需要阻塞的线程调用“方法等待放行条件:
public void waitOnBarrier()
当条件满足时,移除栅栏,所有等待的线程将继续执行:
removeBarrier();
异常处理 DistributedBarrier 会监控连接状态,当连接断掉时waitOnBarrier()
方法会抛出异常。
看一个例子:
package com.colobu.zkrecipe.barrier;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.barriers.DistributedBarrier;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.test.TestingServer;public class DistributedBarrierExample { private static final int QTY = 5; private static final String PATH = "/examples/barrier"; public static void main(String[] args) throws Exception { try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
controlBarrier.setBarrier(); for (int i = 0; i < QTY; ++i) { final DistributedBarrier barrier = new DistributedBarrier(client, PATH); final int index = i;
Callable<Void> task = new Callable<Void>() { @Override
public Void call() throws Exception {
Thread.sleep((long) (3 * Math.random()));
System.out.println("Client #" + index + " waits on Barrier");
barrier.waitOnBarrier();
System.out.println("Client #" + index + " begins"); return null;
}
};
service.submit(task);
}
Thread.sleep(10000);
System.out.println("all Barrier instances should wait the condition");
controlBarrier.removeBarrier();
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
}
}
}
这个例子创建了controlBarrier
来设置栅栏和移除栅栏。 我们创建了5个线程,在此Barrier上等待。 最后移除栅栏后所有的线程才继续执行。
如果你开始不设置栅栏,所有的线程就不会阻塞住。
双栅栏Double Barrier
双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
。 构造函数为:
public DistributedDoubleBarrier(CuratorFramework client,
String barrierPath, int memberQty)Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.
Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier
memberQty
是成员数量,当enter
方法被调用时,成员被阻塞,直到所有的成员都调用了enter
。 当leave
方法被调用时,它也阻塞调用线程, 知道所有的成员都调用了leave
。 就像百米赛跑比赛, 发令枪响, 所有的运动员开始跑,等所有的运动员跑过终点线,比赛才结束。
DistributedBarrier 会监控连接状态,当连接断掉时enter()
和leave
方法会抛出异常。
例子代码:
package com.colobu.zkrecipe.barrier;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.barriers.DistributedBarrier;import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.test.TestingServer;public class DistributedBarrierExample { private static final int QTY = 5; private static final String PATH = "/examples/barrier"; public static void main(String[] args) throws Exception { try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY); final int index = i;
Callable<Void> task = new Callable<Void>() { @Override
public Void call() throws Exception {
Thread.sleep((long) (3 * Math.random()));
System.out.println("Client #" + index + " enters");
barrier.enter();
System.out.println("Client #" + index + " begins");
Thread.sleep((long) (3000 * Math.random()));
barrier.leave();
System.out.println("Client #" + index + " left"); return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
}
}
}
- 环境配置:React Native 开发环境配置 For Android
- 美团多渠道打包方案详解,速度快到白驹过隙
- 下一代Android渠道打包工具
- 01 整合IDEA+Maven+SSM框架的高并发的商品秒杀项目之业务分析与DAO层
- 通俗易懂的分析如何用Python实现一只小爬虫,爬取拉勾网的职位信息
- 我的第一个小程序(Discuz! + 微信小程序)
- 微信小程序 wx.request 的封装
- 如何用Python爬虫实现百度图片自动下载?
- 以太坊智能合约开发入门
- lodash源码分析之baseFindIndex中的运算符优先级
- 分子对接简明教程 (一)
- 分子对接简明教程 (二)
- 分子对接简明教程 (三)
- 分子对接简明教程 (4)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Flink教程-使用sql将流式数据写入文件系统
- flink教程-flink 1.11 集成zeppelin实现简易实时计算平台
- flink教程-详解flink 1.11 中的CDC (Change Data Capture)
- flink教程-基于flink 1.11 使 sql客户端支持执行sql文件
- flink教程-详解flink 1.11 中的JDBC Catalog
- flink教程-flink modules详解之使用hive函数
- 面试iOS 机会在自己手中
- Flink教程-将流式数据写入redis
- Flink教程-keyby 窗口数据倾斜的优化
- Flink源码分析之深度解读流式数据写入hive
- 浙大版《C语言程序设计(第3版)》题目集 习题10-1 判断满足条件的三位数
- 差分标记-HDU1556 Color the ball
- flink cep 案例之机架温度监控报警
- 详解flink 1.11中的新部署模式-Application模式
- 浙大版《C语言程序设计(第3版)》题目集 习题10-2 递归求阶乘和