hadoop源码解析之RPC分析
@
- 定义接口
- 实现接口
- 启动一个server
- 构建一个client的代理
- 执行相应的方法。
- 内部类介绍
- Call
- Connection。
- Handler
- Listener
- Reader
- Responder
- Server的启动
- 接收请求
- Reader线程读取数据
- Handler线程处理请求
- 获取代理
- 发送请求。
前言
因为hadoop底层各种通讯都用的是rpc,如client和namenode、client和datanode、namanode和datanode等。所以首先学习了一下hadoop rpc的内部实现,拜读了一下hadoop的源码
准备工作
首先下载hadoop的最新稳定版源码(目前是2.7.3),编译hadoop源码,因为hadoop的底层序列号用的是google的 protobuf,所以需要把这些proto文件编译成java文件,方便debug调试。如果比较懒的话,其实用maven把相关jar和源码包下载下来也行。
Hadoop的rpc并没有采用现成的rpc框架,如thrift等,而是采用jdk自带的库完全自己写了一套,更加轻量级,更加可控。
用到的主要的技术是java NIO、网络编程、反射和动态代理,如果对这几块不太熟悉的话,建议先找些资料看看相关的东西
#Hadoop rpc实现流程 Hadoop rpc框架位于hadoop源码的hadoop-commn项目里,就像我们学习任何语言先学习hello world一样,我们先来一个最简单的程序,这个程序是从hadoop源码test目录里找到的,testRPC.java,我们运行其中的main方法。(我这在main方法简单改动,new了个Configuration()对象,当参数传进来)
这里写图片描述
定义接口
首先要定义一个接口协议,所有的接口都要继承VersionedProtocol
public interface TestProtocol extends VersionedProtocol {
public static final long versionID = 1L;
String echo(String value) throws IOException;
}
实现接口
要实现这个接口
public static class TestImpl implements TestProtocol {
@Override
public long getProtocolVersion(String protocol, long clientVersion) {
return TestProtocol.versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int hashcode) {
return new ProtocolSignature(TestProtocol.versionID, null);
}
@Override
public String echo(String value) throws IOException {
return value;
}
}
启动一个server
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.build();
server.start();
构建一个client的代理
TestProtocol proxy = RPC.getProxy(TestProtocol.class,TestProtocol.versionID,addr, conf);
执行相应的方法。
String stringResult = proxy.echo("hello hadoop rpc");
System.out.println(stringResult);
重点内容
Server底层实现
内部类介绍
server类是org.apache.hadoop.ipc.Server,里面包含几个重要的内部类
内部类介绍
Call
将一个rpc请求需要的东西封装到Call对象里
private final int callId; // the client's call id 客户端id
private final int retryCount; // the retry count of the call 重试次数
private final Writable rpcRequest; // Serialized Rpc request from client 序列号的请求
private final Connection connection; // connection to client
private long timestamp; // time received when response is null
// time served when response is not null
private ByteBuffer rpcResponse; // the response for this call
private final RPC.RpcKind rpcKind;
private final byte[] clientId;
private final Span traceSpan; // the tracing span on the server side
Connection。
客户端与服务器通信的一些信息在这个里面
Handler
用于处理接受到rpc请求
Listener
用于监听rpc请求。
Reader
用于读取Listener接受到的请求
Responder
用于将rpc请求返回客户端
Server的启动
服务器的构造是通过静态方法RPC.Builder(conf).build()创建的,通过跟踪代码我们发现他最后调用了Server的构造方法
protected Server(String bindAddress, int port,Class<? extends Writable> rpcRequestClass, int handlerCount,int numReaders, int queueSizePerHandler, Configuration conf,String serverName, SecretManager<? extends TokenIdentifier> secretManager,String portRangeConfig) throws IOException {
…………………………………..
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),maxQueueSize, prefix, conf);
…………………………………………
// Start the listener here and let it bind to the port
listener = new Listener();
this.port = listener.getAddress().getPort();
………………………………………………..
// Create the responder here
responder = new Responder();
…………………………………………….
}
我们看到我们上面提到的两个内部类listener和responder都是在这里创建的,之后调用start方法启动服务。
public synchronized void start() {
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}
接收请求
从Listener的构造方法中我们看到服务器监听了SelectionKey.OP_ACCEPT,他只是监听是否有请求过来,而不做处理,这样为了提高并发。同时启动了一些Reader线程,这些线程是用来从channel读取数据的。
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
}
//监听OP_ACCEPT事件
**acceptChannel.register(selector, SelectionKey.OP_ACCEPT);**
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
Reader线程读取数据
通过Listener的run方法我们看到如果一旦接受到请求,然后就让reader去处理
Connection c = connectionManager.register(channel);
// If the connectionManager can't take it, close the connection.
if (c == null) {
if (channel.isOpen()) {
IOUtils.cleanup(null, channel);
}
continue;
}
key.attach(c); // so closeCurrentConnection can get the object
**reader.addConnection(c);**
跟踪Reader的run方法,我们看到最后将读取的信息封装成了一个Call对象put到callQueue中
Call call = new Call(header.getCallId(), header.getRetryCount(),
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceSpan);
callQueue.put(call); // queue the call; maybe blocked here
Handler线程处理请求
final Call call = callQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
}
if (!call.connection.channel.isOpen()) {
LOG.info(Thread.currentThread().getName() + ": skipped " + call);
continue;
}
最后调用了RpcInvoker的call方法最终通过反射来执行相应的方法
Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
Object value =
method.invoke(protocolImpl.protocolImpl, call.getParameters());
if (server.verbose) log("Return: "+value);
return new ObjectWritable(method.getReturnType(), value);
客户端实现
获取代理
通过RPC的静态方法getProxy获取代理
TestProtocol proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
addr, conf);
在这里是通过java的动态代理来获取代理。通过跟踪代码我们找到了这里
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (connectionRetryPolicy != null) {
throw new UnsupportedOperationException(
"Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
}
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
factory, rpcTimeout, fallbackToSimpleAuth));
return new ProtocolProxy<T>(protocol, proxy, true);
}
发送请求。
在Invoker的构造方法里,我们看到在这里新建了一个org.apache.hadoop.ipc.Client对象,在invoke方法里调用了client里面的call方法,最终调用connection.sendRpcRequest(call); 来发送rpc请求
final Call call = createCall(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
throw new IOException("connection has been closed", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
}
在sendRpcRequest方法里,可以看到使用了基于tcp的socket通讯,将数据发送了服务器端。
synchronized (Connection.this.out) {
if (shouldCloseConnection.get()) {
return;
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
byte[] data = d.getData();
int totalLength = d.getLength();
out.writeInt(totalLength); // Total Length
out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
out.flush();
}
总结
比较仓促,写的比较简陋,后续有时间还会继续跟进补充。
- 澳大利亚域名管理机构多年敲竹杠?
- ASP.NET中使用HttpWebRequest调用WCF
- scala 学习笔记(06) OOP(下)多重继承 及 AOP
- Angular企业级开发(4)-ngResource和REST介绍
- CSS魔法堂:"那不是bug,是你不懂我!" by inline-block
- scala 学习笔记(03) 参数缺省值、不定个数参数、类的属性(Property)、泛型初步
- Cmd Markdown编辑器简明语法手册
- 如何让spring mvc web应用启动时就执行特定处理
- CSS魔法堂:小结一下Box Model与Positioning Scheme
- jboss EAP 6.2+ 通过代码控制JNDI数据源
- jboss CLI 命令行接口学习(适用JBOSS EAP 6.2+)
- WebComponent魔法堂:深究Custom Element 之 面向痛点编程
- 修复bootstrap daterangepicker中的3个问题
- 搭建AngualarJS开发环境
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 在PyTorch中使用深度自编码器实现图像重建
- Django+Vue开发生鲜电商平台之9.个人中心功能开发
- Serverless 实战:通过 Component 实现多地域部署容灾
- SQL 行转列+窗口函数的实例
- 回答一下这 10 个最常见的 Javascript 问题
- 千万级数据表选错索引导致的线上慢查询事故
- 递归优化
- Webshell 高级样本收集
- 处理Sprint Boot与Storm1.2.2日志实现的冲突,使用logback记录日志
- Docker 命令总结
- python主题LDA建模和t-SNE可视化
- cannot import name ‘imresize‘ from ‘scipy.misc‘
- 一分钟基础:计算机为什么使用二进制?
- 使用hibernate validate做参数校验
- Leetcode No.4 寻找两个正序数组的中位数