Hbase 源码分析之 Get 流程及rpc原理
分析版本为hbase 0.94
附上趋势团队画的图:
rpc角色表:
HBase通信信道 |
HBase的通信接口 |
|
---|---|---|
客户端 |
服务端 |
|
HBase Client |
Master Server |
HMasterInterface |
HBase Client |
Region Server |
HRegionInterface |
Region Server |
Master Server |
HMasterRegionInterface |
客户端发起请求:
htable.get(Get)
public Result get(final Get get) throws IOException {
return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
public Result call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), get);
}
}.withRetries();
}
调用get方法后,客户端进入睡眠,睡眠时间为pause * HConstants.RETRY_BACKOFF[ntries];
pause= HBASE_CLIENT_PAUSE(1秒)
RETRY_BACKOFF[] = { 1, 1, 1, 2, 2, 4, 4, 8, 16, 32 };
有结果则中断执行返回rpc结果,否则重试十次(默认DEFAULT_HBASE_CLIENT_RETRIES_NUMBER=10)
第一次进行get时,客户端需要先进行rpc通信,获得root表 meta表信息,确定row对应的location
通过ServerCallable维持HRegionInterface 的server实例 ,server为HConnectionManager的getHRegionConnection方法获取的HBaseRPC的VersionedProtocol代理,其实是
WritableRpcEngine实例,call方法则会调用成员HbaseClient的call方法与regionserver进行远程通信
服务器端:
当regionserver 收到来自客户端的Get请求时,调用接口
public Result get(byte[] regionName, Get get)
{
...
HRegion region = getRegion(regionName);
return region.get(get, getLockFromId(get.getLockId()));
...
}
在HRegion中
Scan scan = new Scan(get); 会先根据设置的columnFamily存放familyMap对 ---- columnFamily:null
public Get addFamily(byte [] family) {
familyMap.remove(family);
familyMap.put(family, null);
return this;
}
如果查询的family不在htableDescriptor中,返回错误
scanner = getScanner(scan);
public RegionScanner getScanner(Scan scan) throws IOException {
return getScanner(scan, null);
}
additionalScanners为null 所以在RegionScannerImpl的构造中只会使用StoreScanner
return instantiateRegionScanner(scan, additionalScanners); return new RegionScannerImpl(scan, additionalScanners);
RegionScannerImpl 是 HRegion中的子类
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
StoreScanner scanner = store.getScanner(scan, entry.getValue());
scanners.add(scanner);
}
按照familyMap的数量存放对应数量的 StoreScanner
Hregion initialize时会对应每个columnFamily存放一个stores Future<Store> future = completionService.take(); Store store = future.get(); this.stores.put(store.getColumnFamilyName().getBytes(), store);
scanners 添加从Store中获取的scanner
store.getScanners(cacheBlocks, isGet, isCompaction, matcher)
Store 类:
memStoreScanners = this.memstore.getScanners();
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);
// Then the memstore scanners
scanners.addAll(memStoreScanners);
return scanners;
memStoreScanners 为Collections.<KeyValueScanner>singletonList( new MemStoreScanner())
Store中为StoreScanner添加了StoreFileScanner和 memStoreScanner
进行scan时
scanner = getScanner(scan); scanner.next(results);
现在分析RegionScannerImpl中的next方法,此时正式进入获取数据流程
@Override
public synchronized boolean next(List<KeyValue> outResults)
throws IOException {
// apply the batching limit by default
return next(outResults, batch);
}
batch默认为-1
startRegionOperation();
outResults.addAll(results);
startRegionOperation 会为操作加读锁,lock.readLock().lock();
然后遍历storeHeap,找到对应Row
do { this.storeHeap.next(results, limit - results.size()); } while (Bytes.equals(currentRow, nextRow = peekRow()));
this.storeHeap 会不断poll出存储的scanner
因RegionScannerImpl 中 memStoreScanners后添加,所以会先从memStoreScanners中查询,如果没有则从StoreFileScanner中查询
RegionScannerImpl 的 storeHeap为KeyValueHeap,会强制转型scanner为 InternalScanner
InternalScanner currentAsInternal = (InternalScanner)this.current;
总结下目前流程get request -> regionServer -> region -> storeHeap -> scanner -> find row
但上述流程没有解释reguest是怎么找到regionServer去处理请求的,下边我们在分析下
服务器端服务在HMaster和HRegionServer启动时,中都会生成一个全局的RpcServer
hmaster的rpc server:
hmaster会使用org.apache.hadoop.hbase.executor.ExecutorService启动多种线程服务 (This is a generic executor service. This component abstracts a threadpool, a queue to which EventHandler.EventType
s can be submitted, and a Runnable
that handles the object that is added to the queue. ):
MASTER_OPEN_REGION (默认5)
MASTER_CLOSE_REGION (默认5)
MASTER_SERVER_OPERATIONS (默认3)
MASTER_META_SERVER_OPERATIONS (默认5)
MASTER_TABLE_OPERATIONS (单线程)
logCleaner (单线程)
infoServer (master-status 等信息展示)
rpcServer (我们需要用的rpc服务)
RpcServer是个接口,实现类为HBaseServer,启动时会开启responder listener handlers几种类去响应请求,如设置了priorityHandlers的数目,会另外启动priorityHandlers,listener监听端口,提供请求给handlers,handlers则调用RpcEngine,反射出需要的方法并执行,通过responder写结果回去(this.responder.doRespond)。
HMaster的 handlers的个数由hbase.master.handler.count
HRegionServer的 handlers的个数由 hbase.regionserver.handler.count 指定
HRegionServer的启动和HMaster类似,它启动以下线程:
this.service = new ExecutorService(getServerName().toString());
this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_OPEN_ROOT,
conf.getInt("hbase.regionserver.executor.openroot.threads", 1));
this.service.startExecutorService(ExecutorType.RS_OPEN_META,
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_CLOSE_ROOT,
conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
hlogRoller(daemon)
cacheFlusher(daemon)
compactionChecker(daemon)
Leases(它不是线程,会启动后台线程)
splitLogWorker
rpcServer
HBaseClient 和 HMaster关系由HMasterInterface描述:
Clients interact with the HMasterInterface to gain access to meta-level HBase functionality, like finding an HRegionServer and creating/destroying tables.
HBaseClient 和 HRegionServer关系由HRegionInterface描述:
Clients interact with HRegionServers using a handle to the HRegionInterface
参考资料:
http://zjushch.iteye.com/blog/1173304
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Python 用smtplib库发邮件报错:[WinError 10061] 由于目标计算机积极拒绝,无法连接。解决办法
- python运算符
- Windows 技术篇-禁用windows更新服务,解决windows无法关闭更新问题,解决windows自己启用更新问题。
- 搭建 SkyWalking 服务(For ElasticSearch 7)
- 基于 SkyWalking 实现服务链路追踪
- Python 技巧篇-字符串灵活处理:字符串过滤、字符串拼接,字符串切片,特殊、超长字符串的处理实例演示
- Python 技巧篇-开头注释怎么写最好,开头注释需要包含什么,开头注释的重要性
- 在CentOS8下安装Python3和ansible
- 开发一个属于自己的Spring Boot Starter
- 如何让Tomcat使用APR连接器
- VueJS中使用前端虚拟接口Mock.js
- CentOS7下源码安装MySQL 8.x
- 最新!中国天气网api接口调用,key获取方式,数据请求秘钥获取,城市id获取方法
- Jmeter接口压测快速入门
- 超简单!Qt Designer插入图片,styleSheet加入图片,Qt加入背景图片