聊聊RespServer
序
本文主要研究一下RespServer
Resp
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/Resp.java
interface Resp {
void channel(SocketChannel channel);
void connected(ChannelHandlerContext ctx);
void disconnected(ChannelHandlerContext ctx);
void receive(ChannelHandlerContext ctx, RedisToken message);
}
- Resp接口定义了channel、connected、disconnected、receive方法
RespServer
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/RespServer.java
public class RespServer implements Resp {
private static final Logger LOGGER = LoggerFactory.getLogger(RespServer.class);
private static final int BUFFER_SIZE = 1024 * 1024;
private static final int MAX_FRAME_SIZE = BUFFER_SIZE * 100;
private static final String DEFAULT_HOST = "localhost";
private static final int DEFAULT_PORT = 12345;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ChannelFuture future;
private final RespServerContext serverContext;
public RespServer(RespServerContext serverContext) {
this.serverContext = requireNonNull(serverContext);
}
public static Builder builder() {
return new Builder();
}
public void start() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new RespInitializerHandler(this))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_RCVBUF, BUFFER_SIZE)
.option(ChannelOption.SO_SNDBUF, BUFFER_SIZE)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
future = bootstrap.bind(serverContext.getHost(), serverContext.getPort());
// Bind and start to accept incoming connections.
future.syncUninterruptibly();
serverContext.start();
LOGGER.info("server started: {}:{}", serverContext.getHost(), serverContext.getPort());
}
public void stop() {
try {
if (future != null) {
closeFuture(future.channel().close());
}
future = null;
} finally {
workerGroup = closeWorker(workerGroup);
bossGroup = closeWorker(bossGroup);
}
serverContext.stop();
LOGGER.info("server stopped");
}
@Override
public void channel(SocketChannel channel) {
LOGGER.debug("new channel: {}", sourceKey(channel));
channel.pipeline().addLast("redisEncoder", new RedisEncoder());
channel.pipeline().addLast("linDelimiter", new RedisDecoder(MAX_FRAME_SIZE));
channel.pipeline().addLast(new RespConnectionHandler(this));
}
@Override
public void connected(ChannelHandlerContext ctx) {
String sourceKey = sourceKey(ctx.channel());
LOGGER.debug("client connected: {}", sourceKey);
getSession(ctx, sourceKey);
}
@Override
public void disconnected(ChannelHandlerContext ctx) {
String sourceKey = sourceKey(ctx.channel());
LOGGER.debug("client disconnected: {}", sourceKey);
serverContext.removeSession(sourceKey);
}
@Override
public void receive(ChannelHandlerContext ctx, RedisToken message) {
String sourceKey = sourceKey(ctx.channel());
LOGGER.debug("message received: {}", sourceKey);
parseMessage(message, getSession(ctx, sourceKey))
.ifPresent(serverContext::processCommand);
}
//......
}
- RespServer实现了Resp接口,其start方法创建bossGroup、workerGroup,设置RespInitializerHandler为childHandler,然后执行bootstrap.bind(serverContext.getHost(), serverContext.getPort())及serverContext.start();channel设置了redisEncoder、linDelimiter、RespConnectionHandler;receive方法执行parseMessage(message, getSession(ctx, sourceKey)).ifPresent(serverContext::processCommand)
RespInitializerHandler
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/RespInitializerHandler.java
class RespInitializerHandler extends ChannelInitializer<SocketChannel> {
private final Resp impl;
RespInitializerHandler(Resp impl) {
this.impl = impl;
}
@Override
protected void initChannel(SocketChannel channel) throws Exception {
impl.channel(channel);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
impl.disconnected(ctx);
}
}
- RespInitializerHandler继承ChannelInitializer,其initChannel、channelInactive方法均委托给Resp的实现
RedisEncoder
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/protocol/RedisEncoder.java
public class RedisEncoder extends MessageToByteEncoder<RedisToken> {
@Override
protected void encode(ChannelHandlerContext ctx, RedisToken msg, ByteBuf out) throws Exception {
out.writeBytes(new RedisSerializer().encodeToken(msg));
}
}
- RedisEncoder继承了MessageToByteEncoder,其encode通过RedisSerializer的encodeToken来编码RedisToken
RedisDecoder
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/protocol/RedisDecoder.java
public class RedisDecoder extends ReplayingDecoder<Void> {
private final int maxLength;
public RedisDecoder(int maxLength) {
this.maxLength = maxLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
out.add(parseResponse(buffer));
}
private RedisToken parseResponse(ByteBuf buffer) {
RedisToken token = createParser(buffer).next();
checkpoint();
return token;
}
private RedisParser createParser(ByteBuf buffer) {
return new RedisParser(maxLength, new NettyRedisSource(this, buffer));
}
//......
}
- RedisDecoder继承了ReplayingDecoder,其decode通过RedisParser来解析
RespServerContext
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/RespServerContext.java
public class RespServerContext implements ServerContext {
private static final Logger LOGGER = LoggerFactory.getLogger(RespServerContext.class);
private final StateHolder state = new StateHolder();
private final ConcurrentHashMap<String, Session> clients = new ConcurrentHashMap<>();
private final Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
private final String host;
private final int port;
private final CommandSuite commands;
private SessionListener sessionListener;
public RespServerContext(String host, int port, CommandSuite commands) {
this(host, port, commands, nullListener());
}
public RespServerContext(String host, int port, CommandSuite commands,
SessionListener sessionListener) {
this.host = requireNonNull(host);
this.port = requireRange(port, 1024, 65535);
this.commands = requireNonNull(commands);
this.sessionListener = sessionListener;
}
public void start() {
}
public void stop() {
clear();
scheduler.shutdown();
}
@Override
public int getClients() {
return clients.size();
}
@Override
public RespCommand getCommand(String name) {
return commands.getCommand(name);
}
@Override
public <T> Option<T> getValue(String key) {
return state.getValue(key);
}
@Override
public <T> Option<T> removeValue(String key) {
return state.removeValue(key);
}
@Override
public void putValue(String key, Object value) {
state.putValue(key, value);
}
@Override
public String getHost() {
return host;
}
@Override
public int getPort() {
return port;
}
Session getSession(String sourceKey, Function<String, Session> factory) {
return clients.computeIfAbsent(sourceKey, key -> {
Session session = factory.apply(key);
sessionListener.sessionCreated(session);
return session;
});
}
void processCommand(Request request) {
LOGGER.debug("received command: {}", request);
RespCommand command = getCommand(request.getCommand());
try {
executeOn(execute(command, request))
.subscribe(response -> processResponse(request, response),
ex -> LOGGER.error("error executing command: " + request, ex));
} catch (RuntimeException ex) {
LOGGER.error("error executing command: " + request, ex);
}
}
protected CommandSuite getCommands() {
return commands;
}
protected void removeSession(String sourceKey) {
Session session = clients.remove(sourceKey);
if (session != null) {
sessionListener.sessionDeleted(session);
}
}
protected Session getSession(String key) {
return clients.get(key);
}
protected RedisToken executeCommand(RespCommand command, Request request) {
return command.execute(request);
}
protected <T> Observable<T> executeOn(Observable<T> observable) {
return observable.observeOn(scheduler);
}
private void processResponse(Request request, RedisToken token) {
request.getSession().publish(token);
if (request.isExit()) {
request.getSession().close();
}
}
private Observable<RedisToken> execute(RespCommand command, Request request) {
return Observable.create(observer -> {
observer.onNext(executeCommand(command, request));
observer.onComplete();
});
}
private int requireRange(int value, int min, int max) {
if (value <= min || value > max) {
throw new IllegalArgumentException(min + " <= " + value + " < " + max);
}
return value;
}
private void clear() {
clients.clear();
state.clear();
}
}
- RespServerContext实现了ServerContext接口,其构造器要求设置commands参数;其processCommand方法先通过getCommand(request.getCommand())获取RespCommand,之后通过executeCommand来执行,最后返回RedisToken
RedisToken
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/protocol/RedisToken.java
public interface RedisToken {
RedisToken NULL_STRING = string((SafeString) null);
RedisToken RESPONSE_OK = status("OK");
RedisTokenType getType();
<T> T accept(RedisTokenVisitor<T> visitor);
static RedisToken nullString() {
return NULL_STRING;
}
static RedisToken responseOk() {
return RESPONSE_OK;
}
static RedisToken string(SafeString str) {
return new StringRedisToken(str);
}
static RedisToken string(String str) {
return new StringRedisToken(safeString(str));
}
static RedisToken status(String str) {
return new StatusRedisToken(str);
}
static RedisToken integer(boolean b) {
return new IntegerRedisToken(b ? 1 : 0);
}
static RedisToken integer(int i) {
return new IntegerRedisToken(i);
}
static RedisToken error(String str) {
return new ErrorRedisToken(str);
}
static RedisToken array(RedisToken... redisTokens) {
return new ArrayRedisToken(ImmutableList.of(redisTokens));
}
static RedisToken array(Collection<RedisToken> redisTokens) {
return new ArrayRedisToken(ImmutableList.from(redisTokens));
}
static RedisToken array(Sequence<RedisToken> redisTokens) {
return new ArrayRedisToken(redisTokens.asArray());
}
static <T> Stream<T> visit(Stream<RedisToken> tokens, RedisTokenVisitor<T> visitor) {
return tokens.map(token -> token.accept(visitor));
}
}
- RedisToken接口定义了getType、accept方法
小结
RespServer实现了Resp接口,其start方法创建bossGroup、workerGroup,设置RespInitializerHandler为childHandler,然后执行bootstrap.bind(serverContext.getHost(), serverContext.getPort())及serverContext.start();channel设置了redisEncoder、linDelimiter、RespConnectionHandler;receive方法执行parseMessage(message, getSession(ctx, sourceKey)).ifPresent(serverContext::processCommand)
doc
- 你能用微信小程序打开小程序了【附开发方法】
- Logistic回归实战篇之预测病马死亡率(一)
- 腾讯游戏DBA利刃 - SQL审核工具介绍
- Logistic回归实战篇之预测病马死亡率(二)
- Windows环境下跑通Truffle开发环境
- Logistic回归实战篇之预测病马死亡率(三)
- 如何将finecms链接URL中的list和show去掉
- Solidity语法知识点(文末有彩蛋)
- 人脸Haar特征与快速计算神器:积分图
- 内存为王:DBIM RAC Share Nothing架构的挑战和解决方案
- 调用finecms栏目多图怎么实现
- phpcms调用子栏目名称/文章怎么操作
- 小程序开发工具全新上线 附下载地址和教程
- JS页面跳转代码怎么写?总结了5种方法
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Android监听手机短信的示例代码
- Android开发之图片压缩工具类完整实例
- Android6.0开发中屏幕旋转原理与流程分析
- Android中WebView的基本配置与填坑记录大全
- Android开发实现ListView异步加载数据的方法详解
- Android开发实现AlertDialog中View的控件设置监听功能分析
- 详解Android 语音播报实现方案(无SDK)
- Android实现中轴旋转特效 Android制作别样的图片浏览器
- Android使用DrawerLayout实现双向侧滑菜单
- Android实现3D推拉门式滑动菜单源码解析
- Android编程处理窗口控件大小,形状,像素等UI元素工具类
- Android开发实现的Log统一管理类
- Android中可以作为Log开关的一些操作及安全性详解
- 实例详解Android Webview拦截ajax请求
- Android给布局、控件加阴影效果的示例代码