ZK实现分布式锁
时间:2022-07-24
本文章向大家介绍ZK实现分布式锁,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
上一篇说了ZK是什么以及能干什么,今儿这篇就来用ZK实现分布式锁,分别用java原生的zookeeper客户端、ZKClient实现。
一、分布式锁
分布式锁的思路是每个客户端都在某个目录下注册一个临时有序节点,每次最小的节点会获取锁,当前节点会去监听上一个较小节点,如果较小节点失效之后,就会去获取锁。
java原生zookeeper客户端
(1)引入jar包
(2)创建ZK客户端连接单例
public class ZookeeperClient {
//zk集群地址
public static final String ZOOKEEPER_CONNECT="192.168.197.100:2181,192.168.197.110:2181,192.168.197.120:2181";
//计数器,用于等待连接成功
public static CountDownLatch countDownLatch = new CountDownLatch(1);
//连接超时时间
public static final int SESSION_TIMEOUT = 5000;
//用volatile修饰单例,防止赋值时发生指令重排
private volatile static ZooKeeper instance;
//用Double check获取单例
public static ZooKeeper getInstance() throws IOException, InterruptedException {
if (instance == null ){
synchronized (ZookeeperClient.class) {
if (instance == null) {
//连接时注册一个监听,监听连接状态变化
instance = new ZooKeeper(ZOOKEEPER_CONNECT, SESSION_TIMEOUT, new Watcher() {
//监听回调方法
@Override
public void process(WatchedEvent watchedEvent) {
//当连接状态变成connected,就说明连接成功
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
});
//等待连接成功
countDownLatch.await();
}
}
}
return instance;
}
public static int getSessionTimeout() {
return SESSION_TIMEOUT;
}
}
上述代码中的CountDownLatch是因为连接时会耗时较长,所以需要添加一个计数器进行阻塞,否则会在connecting阶段就被释放了。
(3)创建分布式锁客户端
public class DistibutedLock {
//根目录,客户端都会去此目录下创建临时有序子节点
private final String ROOT_PATH = "/lock";
//客户端
private ZooKeeper zookeeper;
//session超时时间
private int SESSION_TIMEOUT;
//当前客户端创建有序节点的名称
private String lockId;
private CountDownLatch countDownLatch = new CountDownLatch(1);
public DistibutedLock() throws IOException, InterruptedException {
this.zookeeper =ZookeeperClient.getInstance();
this.SESSION_TIMEOUT = ZookeeperClient.getSessionTimeout();
}
public boolean lock(){
try {
//创建临时有序子节点
lockId = zookeeper.create(ROOT_PATH+"/","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName()+"创建节点"+lockId+",开始竞争锁");
//获取/lock目录下所有子节点
List<String> children = zookeeper.getChildren(ROOT_PATH, true);
//用SortedSet对子节点从小到大进行排序
SortedSet<String> sortedSet = new TreeSet<String>();
for (String child : children) {
sortedSet.add(ROOT_PATH+"/"+child);
}
//获取最小节点名称
String first = sortedSet.first();
//如果当前创建节点就是最小节点,则获取锁
if (first.equals(lockId)) {
System.out.println(Thread.currentThread().getName()+"获取锁"+lockId);
return true;
}
//获取比当前id小的节点集合
SortedSet<String> frontSet = sortedSet.headSet(lockId);
if (!frontSet.isEmpty()) {
//取集合中最后一个元素,也就是临近最小节点
String last = frontSet.last();
System.out.println(lockId+"监听"+last);
//当前节点去监听上一个节点,当上一个节点被删除的时候
//当前节点就可以获取锁
zookeeper.exists(last, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
countDownLatch.countDown();
}
}
});
countDownLatch.await(SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
}
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
//释放锁
public boolean unLock(){
try {
System.out.println(Thread.currentThread().getName() + "开始删除锁" + lockId);
//删除当前节点
zookeeper.delete(lockId, -1);
return true;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return false;
}
}
(4)测试代码
//等待器,当所有线程都执行到某个步骤才停止阻塞
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
//模拟十个线程去获取锁
for (int i = 0; i < 10; i++) {
new Thread(()-> {
DistibutedLock lock = null;
try {
lock = new DistibutedLock();
cyclicBarrier.await();
lock.lock();
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} finally {
if(lock!=null){
lock.unLock();
}
}
}).start();
}
运行结果:按照创建顺序去获取锁
ZKClient
(1)引入jar包
(2)创建ZK客户端连接单例
public class ZKClientInstance {
public static final String ZOOKEEPER_CONNECT="192.168.197.100:2181,192.168.197.110:2181,192.168.197.120:2181";
private volatile static ZkClient instance;
public static ZkClient getInstance(){
if (instance == null) {
synchronized (ZKClientInstance.class) {
if (instance == null) {
instance = new ZkClient(ZOOKEEPER_CONNECT,5000,
5000,new SerializableSerializer());
}
}
}
return instance;
}
}
(3)创建分布式锁客户端
public class ZKClientDisLock {
private static final String ROOT_PATH = "/lock";
private ZkClient zkClient;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private String lockId;
public ZKClientDisLock(ZkClient zkClient) {
this.zkClient = zkClient;
}
public boolean lock(){
lockId = zkClient.createEphemeralSequential(ROOT_PATH + "/", "123");
List<String> children = zkClient.getChildren(ROOT_PATH);
SortedSet<String> sortedSet = new TreeSet<String>();
for (String child : children) {
sortedSet.add(ROOT_PATH+"/"+child);
}
String first = sortedSet.first();
if (first.equals(lockId)) {
System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
return true;
}
SortedSet<String> frontSet = sortedSet.headSet(lockId);
if (null != frontSet && frontSet.size() > 0) {
String last = frontSet.last();
IZkDataListener iZkDataListener = null;
try {
System.out.println(lockId + "监听" + last + "节点变化");
iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
countDownLatch.countDown();
}
};
zkClient.subscribeDataChanges(last, iZkDataListener);
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
} catch (Exception e) {
}finally {
zkClient.unsubscribeDataChanges(last,iZkDataListener);
}
return true;
}
return false;
}
public void unLock(){
System.out.println(Thread.currentThread().getName()+ "释放锁"+ lockId + "-----");
zkClient.delete(lockId);
}
}
(3)测试代码
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(()-> {
ZKClientDisLock lock = null;
try {
lock = new ZKClientDisLock(ZKClientInstance.getInstance());
cyclicBarrier.await();
lock.lock();
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} finally {
if(lock!=null){
lock.unLock();
}
}
}).start();
}
运行结果:
上述就是用java原生api以及ZKClient实现的分布式锁。
还有一种是用apache-curator实现,其可以实现可重入锁、排它锁、读写锁。之后有机会介绍curator的使用方法。
- 斯坦福发布首份 AI Index 报告,AI 研究者不再茫然
- jQuery仿极客公园火箭发射“返回顶部”效果(初始篇)
- Java程序员必读,Java设计模式应该遵循哪些原则
- Visual Studio 2010快速参考指南里头的Scrum海报
- TFS2010 Team Project Collections
- 基于Ext.Panel扩展一个更容易操作的Canvas
- 好玩的效果很好的Html游戏和范例
- jQuery仿极客公园火箭发射“返回顶部”效果(优化篇)
- 北京市首个无人驾驶试运营基地落户顺义
- 后rtx集成时代
- 后短信集成时代
- jQuery仿极客公园火箭发射“返回顶部”效果(WordPress代码篇)
- Windows 2008 R2 Server Core .NET环境配置
- Request——Node世界中被依赖最多的库No.2
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 走进Java接口测试之整合ELK实现日志收集
- 【一天一大 lee】 监控二叉树 (难度:困难)-Day20200922
- 网络学习笔记2——物理层基础(信号与系统)(未完待续)
- 浅谈 React 中的 XSS 攻击
- Chrome 80+ 跨域Samesite 导致的cookie not found 解决方法
- 再不迁移到Material Design Components 就out啦
- hbase 学习
- 再谈Fragment
- java线程池(四):ForkJoinPool的使用及基本原理
- 算法书中算法
- Robo3T 与 NaviCat 的安装
- 牛客网2017年校招真题-1
- 实例分割新思路之SOLO v1&v2深度解析
- 牛客网剑指offer-3
- java8新特性总结备忘