我们是这样一步一步实现分布式锁的

时间:2022-07-22
本文章向大家介绍我们是这样一步一步实现分布式锁的,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

昨天我给大家分享了我们电商平台是怎么去做分布式事务的(不好意思,懂分布式事务的你真的很了不起,下篇,不好意思,懂分布式事务的你真的很了不起,上篇),忘记了的朋友希望能回去扫一下,这些都是你进阶路上必须要经历的。

今天,分享的专题是分布式锁。那锁是什么,我就不多说了,相信大家都知道;分布式锁又是什么呢,也不用多说,这些概念大家都能不假思索的脱口而出了,简单的理解不就是,在分布式环境中多个机器共有的一把锁么,对不对。

我们今天的重心是,我们怎么去开发一个分布式锁去满足我们业务。我今天会讲我们部门三个分布式锁的实现方式,帮大家去少走弯路,直接照着做就行啦。

  • 基于数据库开发
  • 基于内存redis开发
  • 基于zookeeper组件开发

01

基于数据库开发

我们组有个不大不小的项目,在前两年用的这种方式去保证分布式环境中各个应用争抢共享资源。目前,也没有出大问题,主要是并发量不是太大,然后数据库是单独一台机器,配置是8CPU 16G。是怎么做的呢?

  1. 在我们的data_common数据中新建了一张分布式锁表(ds_lock)。
  2. 当并发请求过来的时候,我们就将对应的锁新增到这个ds_lock表中(其他的请求就会等待),然后处理自己的业务逻辑,比如扣减库存。
  3. 当扣减成功了,就将这个锁在ds_lock 表内删除掉,代表释放锁。
  4. 其他请求发现数据已经被删了证明锁已经释放了,这时候其他请求执行新增表操作以证明自己拿到了锁。

优点:简单易实现,只要逻辑清晰,利用一张数据库表就实现了,很方便。

缺点:对于并发量大的话,数据库的IO可能就是瓶颈,会造成性能低,吞吐量低等。

开发建议:在初创公司没有引入像redis或者zookeeper组件的时候,对于并发不大的业务就用这个方案,是很不错的。

02

基于内存Redis开发

由于数据库在IO方面以及高并发下受限等一些缺点,后来Redis的出现,我们就将部分分布式业务中的锁放到了Redis中去管理,相关思路如下:

  1. 利用Redis中setnx(key,value)函数,这个函数的特点就是如果key不存在就设置到内存。
  2. 然后,我们将锁存进redis里面去,setnx(lock_id,currentTime+timeOut)。表示如果当前lock_id获得锁后,如果在timeOut的时间内还没释放的话,系统就会自动释放当前锁lock_id。
  3. setnx(lock_id,currentTime+timeOut) 返回 1,则证明获得了锁;
  4. setnx(lock_id,currentTime+timeOut) 返回 0,则证明获得锁失败,然后把这个请求放到等待获取队列中去,等到前面的释放了,就出来获取锁。

Redisson

大家也可以直接使用开源组件,这里推荐一个比较好用的一个基于redis实现的开源分布式锁,Redisson。使用起来还是很方便的。

基本结构差不多这个样子,更多的可以看下Redisson官网:

RLock lock = redissonClient.getLock(getLockKey(t));

try {

if (lock.tryLock()) {

//业务逻辑

return true;

} else {

// 获取锁失败的业务逻辑

return false;

}

} finally {

lock.unlock();

}

}

缺点:基于Redis内存开发的分布式锁,主要有个问题就是,超时时间我们不能确保合理设置,需要一直监控并且根据业务调整。

03

基于zookeeper开发

这里可以使用zookeeper的临时顺序节点来实现分布式锁。

实现步骤:

  1. 创建临时顺序节点/ds_locks/tlock,多个进程创建的tlock节点会按照时间顺序进行编号。
  2. 获取/ds_locks节点下面所有的子节点tlock,然后判断自己的编号是不是最小的,如果是最小的,则获取所成功,反之,则注册它前面一个编号的节点监听时间Watch
  3. 获取锁的节点,等到自己业务处理完或者发生故障了则会删除对应的节点tlock,然后,比他大的那个节点就会获取到锁。

实现代码:

public class ZkLock {

public final static Joiner j = Joiner.on("|").useForNull("");

// zk客户端

private ZooKeeper zk;

// zk是一个目录结构,root为最外层目录

private String root = "/locks";

// 锁的名称

private String lockName;

// 当前创建的序列节点

private ThreadLocal<String> nodeId = new ThreadLocal<>();

// 用来同步等待zkclient链接到了服务端

private CountDownLatch connectedSignal = new CountDownLatch(1);

private final static int sessionTimeout = 3000;

private final static byte[] data = new byte[0];

public ZookeeperDistributedLock(String config, String lockName) {

this.lockName = lockName;

try {

zk = new ZooKeeper(config, sessionTimeout, new Watcher() {

@Override

public void process(WatchedEvent event) {

// 建立连接

if (event.getState() == KeeperState.SyncConnected) {

connectedSignal.countDown();

}

}

});

connectedSignal.await();

Stat stat = zk.exists(root, false);

if (null == stat) {

// 创建根节点

zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

}

} catch (Exception e) {

throw new RuntimeException(e);

}

}

class LockWatcher implements Watcher {

private CountDownLatch latch = null;

public LockWatcher(CountDownLatch latch) {

this.latch = latch;

}

@Override

public void process(WatchedEvent event) {

if (event.getType() == Event.EventType.NodeDeleted)

latch.countDown();

}

}

public void lock() {

try {

// 创建临时子节点

String myNode = zk.create(root + "/" + lockName, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,

CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println(j.join(Thread.currentThread().getName() + myNode, "created"));

// 获取所有子节点

List<String> subNodes = zk.getChildren(root, false);

TreeSet<String> sortedNodes = new TreeSet<>();

for (String node : subNodes) {

sortedNodes.add(root + "/" + node);

}

String smallNode = sortedNodes.first();

String preNode = sortedNodes.lower(myNode);

if (myNode.equals(smallNode)) {

// 是否最小节点,则表示取得锁

System.out.println(j.join(Thread.currentThread().getName(), myNode, "get lock"));

this.nodeId.set(myNode);

return;

}

CountDownLatch latch = new CountDownLatch(1);

Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同时注册监听。

// 判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听

if (stat != null) {

System.out.println(j.join(Thread.currentThread().getName(), myNode,

" waiting for " + root + "/" + preNode + " released lock"));

latch.await();// 等待,这里应该一直等待其他线程释放锁

nodeId.set(myNode);

latch = null;

}

} catch (Exception e) {

throw new RuntimeException(e);

}

}

public void unlock() {

try {

System.out.println(j.join(Thread.currentThread().getName(), nodeId.get(), "unlock "));

if (null != nodeId) {

zk.delete(nodeId.get(), -1);

}

nodeId.remove();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (KeeperException e) {

e.printStackTrace();

}

}

}

04

方案比较

最后,我们再来将这三种方案进行个比较,方便大家选择:

如果对于zookeeper这种流程还是不清楚的话,后期我再整理一篇zookeeper实现流程讲解。

总结,今天我把我们自己实现分布式锁的三种方案实现细节都分享出来,也是从简单到复杂的,根据业务的不同去选择方案的,大家也可以根据自己的业务去分析,直接在我这里选用就行了。

关于架构师修炼

本号旨在分享一线互联网各种技术架构解决方案,分布式以及高并发等相关专题,同时会将作者的学习总结进行整理并分享。

更多技术专题,敬请期待