基于Consul的分布式锁实现
我们在构建分布式系统的时候,经常需要控制对共享资源的互斥访问。这个时候我们就涉及到分布式锁(也称为全局锁)的实现,基于目前的各种工具,我们已经有了大量的实现方式,比如:基于Redis的实现、基于Zookeeper的实现。本文将介绍一种基于Consul 的Key/Value存储来实现分布式锁以及信号量的方法。
分布式锁实现
基于Consul的分布式锁主要利用Key/Value存储API中的acquire和release操作来实现。acquire和release操作是类似Check-And-Set的操作:
- acquire操作只有当锁不存在持有者时才会返回true,并且set设置的Value值,同时执行操作的session会持有对该Key的锁,否则就返回false
- release操作则是使用指定的session来释放某个Key的锁,如果指定的session无效,那么会返回false,否则就会set设置Value值,并返回true
具体实现中主要使用了这几个Key/Value的API:
- create session:https://www.consul.io/api/session.html#session_create
- delete session:https://www.consul.io/api/session.html#delete-session
- KV acquire/release:https://www.consul.io/api/kv.html#create-update-key
基本流程
具体实现
public class Lock {
private static final String prefix = "lock/"; // 同步锁参数前缀
private ConsulClient consulClient;
private String sessionName;
private String sessionId = null;
private String lockKey;
/**
*
* @param consulClient
* @param sessionName 同步锁的session名称
* @param lockKey 同步锁在consul的KV存储中的Key路径,会自动增加prefix前缀,方便归类查询
*/
public Lock(ConsulClient consulClient, String sessionName, String lockKey) {
this.consulClient = consulClient;
this.sessionName = sessionName;
this.lockKey = prefix + lockKey;
}
/**
* 获取同步锁
*
* @param block 是否阻塞,直到获取到锁为止
* @return
*/
public Boolean lock(boolean block) {
if (sessionId != null) {
throw new RuntimeException(sessionId + " - Already locked!");
}
sessionId = createSession(sessionName);
while(true) {
PutParams putParams = new PutParams();
putParams.setAcquireSession(sessionId);
if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) {
return true;
} else if(block) {
continue;
} else {
return false;
}
}
}
/**
* 释放同步锁
*
* @return
*/
public Boolean unlock() {
PutParams putParams = new PutParams();
putParams.setReleaseSession(sessionId);
boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue();
consulClient.sessionDestroy(sessionId, null);
return result;
}
/**
* 创建session
* @param sessionName
* @return
*/
private String createSession(String sessionName) {
NewSession newSession = new NewSession();
newSession.setName(sessionName);
return consulClient.sessionCreate(newSession, null).getValue();
}
}
单元测试
下面单元测试的逻辑:通过线程的方式来模拟不同的分布式服务来竞争锁。多个处理线程同时以阻塞方式来申请分布式锁,当处理线程获得锁之后,Sleep一段随机事件,以模拟处理业务逻辑,处理完毕之后释放锁。
public class TestLock {
private Logger logger = Logger.getLogger(getClass());
@Test
public void testLock() throws Exception {
new Thread(new LockRunner(1)).start();
new Thread(new LockRunner(2)).start();
new Thread(new LockRunner(3)).start();
new Thread(new LockRunner(4)).start();
new Thread(new LockRunner(5)).start();
Thread.sleep(200000L);
}
class LockRunner implements Runnable {
private Logger logger = Logger.getLogger(getClass());
private int flag;
public LockRunner(int flag) {
this.flag = flag;
}
@Override
public void run() {
Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key");
try {
if (lock.lock(true)) {
logger.info("Thread " + flag + " start!");
Thread.sleep(new Random().nextInt(3000L));
logger.info("Thread " + flag + " end!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
单元测试执行结果如下:
2017-04-12 21:28:09,698 INFO [Thread-0] LockRunner - Thread 1 start!
2017-04-12 21:28:12,717 INFO [Thread-0] LockRunner - Thread 1 end!
2017-04-12 21:28:13,219 INFO [Thread-2] LockRunner - Thread 3 start!
2017-04-12 21:28:15,672 INFO [Thread-2] LockRunner - Thread 3 end!
2017-04-12 21:28:15,735 INFO [Thread-1] LockRunner - Thread 2 start!
2017-04-12 21:28:17,788 INFO [Thread-1] LockRunner - Thread 2 end!
2017-04-12 21:28:18,249 INFO [Thread-4] LockRunner - Thread 5 start!
2017-04-12 21:28:19,573 INFO [Thread-4] LockRunner - Thread 5 end!
2017-04-12 21:28:19,757 INFO [Thread-3] LockRunner - Thread 4 start!
2017-04-12 21:28:21,353 INFO [Thread-3] LockRunner - Thread 4 end!
从测试结果我们可以看到,通过分布式锁的形式来控制并发时,多个同步操作只会有一个操作能够被执行,其他操作只有在等锁释放之后才有机会去执行,所以通过这样的分布式锁,我们可以控制共享资源同时只能被一个操作进行执行,以保障数据处理时的分布式并发问题。
优化建议
本文我们实现了基于Consul的简单分布式锁,但是在实际运行时,可能会因为各种各样的意外情况导致unlock操作没有得到正确地执行,从而使得分布式锁无法释放。所以为了更完善的使用分布式锁,我们还必须实现对锁的超时清理等控制,保证即使出现了未正常解锁的情况下也能自动修复,以提升系统的健壮性。那么如何实现呢?请持续关注我的后续分解!
参考文档
Key/Value的API:https://www.consul.io/api/kv.html
- 域名“宝贝”baby.cn以71万元价格结拍
- 温故而知新:设计模式之组合模式(Composite)
- ruby学习笔记(7)-闭包
- ruby学习笔记(6)-Array的使用
- centos7下部署iptables环境纪录(关闭默认的firewalle)
- ruby学习笔记(5)-模块module的运用
- linux系统root密码遗忘的情况下的解决办法
- ruby学习笔记(4)-动态修改类的属性
- 如果技术是一种生命
- ruby学习笔记(2)--类的基本使用
- 域名资讯:四声域名BHHS.com被BHHS公司收购
- ruby学习笔记(1)--初识语法
- 无法启用数据库中的 Service Broker,因为已存在启用的具有相同 ID 的 Service Broker。
- Centos7.2下针对LDAP的完整部署记录
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- caffe详解之损失函数
- caffe详解之优化算法
- caffe详解之solver
- caffe详解之工具篇
- caffe详解之Python接口
- caffe详解之mnist手写体识别
- Python数据分析之基础篇(一)
- Python数据分析之基础篇(二)
- linux操作系统df、du命令
- Python数据分析之基础篇(三)
- Python数据分析之matplotlib(基础篇)
- Python数据分析之matplotlib(提高篇)
- Python数据分析之matplotlib(应用篇)
- Intel OpenCL 之 Pipeline(一):基本概念
- Intel OpenCL 之 Pipeline(二)For循环的执行机制