Zookeeper源码分析
zookeeper简介
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
使用案例
dubbo、disconf、elastic-job、分布式锁、leader选举
客户端代码解析
接下来的代码可以从的github项目找到https://github.com/yaojf/zookeeper-learn。
这里我从Client的main方法开始解析(前提启动了Server,即zk服务端),调用点为ZooKeeperMain.main(args),以下代码都是从这个调用到开始。
-
构造ZooKeeperMain
默认的连接地址是localhost:2181,会话超时为30000毫秒,核心代码
zk = new ZooKeeper(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly);
参数依次为zk服务器地址,会话超时时间,默认的观察者类,还有是否只读。
ZooKeeper构造方法里,初始化watchManager(处理各种观察,比如dubbo服务提供者监听),解析服务器地址,默认的根地址,构造ClientCnxncnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);
第六个参数默认为ClientCnxnSocketNIO(jdk nio连接),核心为构造2个线程
sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread();
SendThread线程处理异步连接,获取初始化sessionId,发送心跳,处理读写IO,断开重连等功能。EventThread线程处理事件监听,它通过不停的获取waitingEvents同步队列的数据,做Watcher的处理(比如连接成功的事件)。
最后就是调用cnxn.start();
启动SendThread和EventThread线程。
-
SendThread线程功能
-
发起异步连接
if (!clientCnxnSocket.isConnected()) { // don't re-establish connection if we are closing if (closing) { break; } startConnect(); clientCnxnSocket.updateLastSendAndHeard(); }
startConnect方法随机选取一个zk服务器地址发起异步连接, 然后更新最后的发送和接受的时间戳(用于判断是否到心跳的发送时间)。
-
计算selector的select方法的等待时间
if (state.isConnected()) { // determine whether we need to send an AuthFailed event. to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); }
如果还未连接则用connectTimeout计算,否则用readTimeout计算,这2个参数是在构造ClientCnxn对象时设置的,和sessionTimeout相关
connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3;
-
处理事件选择
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
pendingQueue为等待结果的队列,outgoingQueue为发送请求的队列。
内部逻辑为调用selector.select(waitTimeOut),获取激活的事件,并处理,或 者超时跳出。void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); updateSocketAddresses(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, outgoingQueue, cnxn); } } if (sendThread.getZkState().isConnected()) { synchronized(outgoingQueue) { if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { enableWrite(); } } } selected.clear(); }
如果是连接事件,则完成连接,并往outgoingQueue放入ConnectRequest,然后设置与服务端连接的sockKey感兴趣事件为读写。
如果是读写事件,则走doIO方法。该方法判断如果是读事件激活则读取数据,这里读数据分2个步骤,第一步读4字节的数据长度,然后新建对应长度的ByteBuffer分配给incomingBuffer(读取数据用),首次读取数据会初始化连接。else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; }
读取服务端返回结果,获取服务端分配的sessionId,并且ClientCnxn的连接状态为States.CONNECTED,然后发送连接成功事件(EventThread处理默认的监听器回调)。
不是首次连接则是普通的读数据,反序列化数据,从pendingQueue获取数据(处理完会remove掉),然后notify阻塞在Packet的数据请求。
如果是写事件激活,则从outgoingQueue获取请求Packet,然后发送数据到服务端,如果不是ping请求,则把Packet重新放到pendingQueue.
sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount++; outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } }
-
-
EventThread线程功能
事件处理线程功能比较简单,通过waitingEvents这个LinkedBlockingQueue不停的抓取数据,处理事件监听(zk默认的事件监听都是一次性的,使用一次后会从map里面去除,但是一些框架比如zkClient等他自己封装了非一次性监听的逻辑)。
-
ZooKeeperMain的run方法
这个run方法是从控制台不停的读取请求,然后解释器解释请求,合成对应的请求到zk服务端,我们可以从MyCommandOptions对象里看到所有支持的请求命令,比如LsCommand。具体不同的命令我们可以自行debug,做了解。
总结
ZooKeeper会随机连接一个服务端,然后通过主线程操做ZooKeeper的命令请求方法,配合sendThread和eventThread完成我们对服务端的各种请求。
原文地址:https://www.cnblogs.com/itpy/p/11872283.html
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Android实现View拖拽跟随手指移动效果
- Android中button的onClick事件几种方法
- Android利用Glide获取图片真正的宽高的实例
- Android 自定义AlertDialog对话框样式
- Android自定义一个图形单点移动缩小的效果
- 详解Android实现购物车页面及购物车效果(点击动画)
- Android利用LitePal操作数据库存取图片
- Android 改变图标原有颜色和搜索框的实例代码
- Android自定义滑动验证条的示例代码
- Android实现图片转高斯模糊以及高斯模糊布局
- android多媒体类VideoView使用方法详解
- Android编程实现短信收发及语音播报提示功能示例
- Android viewpager无限轮播获取网络图片功能
- Android 使用ContentObserver监听数据库内容是否更改
- Android UI中TextView的使用方法