聊聊claudb的MasterReplication
时间:2022-07-23
本文章向大家介绍聊聊claudb的MasterReplication,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
序
本文主要研究一下claudb的MasterReplication
MasterReplication
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/MasterReplication.java
public class MasterReplication implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(MasterReplication.class);
private static final String SELECT_COMMAND = "SELECT";
private static final String PING_COMMAND = "PING";
private static final int TASK_DELAY = 2;
private final DBServerContext server;
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
public MasterReplication(DBServerContext server) {
this.server = server;
}
public void start() {
executor.scheduleWithFixedDelay(this, TASK_DELAY, TASK_DELAY, TimeUnit.SECONDS);
}
public void stop() {
executor.shutdown();
}
public void addSlave(String id) {
getServerState().addSlave(id);
LOGGER.info("new slave: {}", id);
}
public void removeSlave(String id) {
getServerState().removeSlave(id);
LOGGER.info("slave revomed: {}", id);
}
@Override
public void run() {
List<RedisToken> commands = createCommands();
for (SafeString slave : getServerState().getSlaves()) {
for (RedisToken command : commands) {
server.publish(slave.toString(), command);
}
}
}
private List<RedisToken> createCommands() {
List<RedisToken> commands = new LinkedList<>();
commands.add(pingCommand());
commands.addAll(commandsToReplicate());
return commands;
}
private List<RedisToken> commandsToReplicate() {
List<RedisToken> commands = new LinkedList<>();
for (RedisToken command : server.getCommandsToReplicate()) {
command.accept(new AbstractRedisTokenVisitor<Void>() {
@Override
public Void array(ArrayRedisToken token) {
commands.add(selectCommand(token));
commands.add(command(token));
return null;
}
});
}
return commands;
}
private RedisToken selectCommand(ArrayRedisToken token) {
return array(string(SELECT_COMMAND),
token.getValue().stream().findFirst().orElse(string("0")));
}
private RedisToken pingCommand() {
return array(string(PING_COMMAND));
}
private RedisToken command(ArrayRedisToken token) {
return array(token.getValue().stream().skip(1).collect(toList()));
}
private DBServerState getServerState() {
return serverState().getOrElseThrow(() -> new IllegalStateException("missing server state"));
}
private Option<DBServerState> serverState() {
return server.getValue("state");
}
}
- MasterReplication实现了Runnable接口,其start方法调度执行自身的runnable,每隔2秒执行一次;其run方法先执行createCommands方法,然后遍历slaves,然后遍历commands,执行server.publish(slave.toString(), command);createCommands先添加ping命令,然后再添加commandsToReplicate;commandsToReplicate方法遍历server.getCommandsToReplicate(),遇到array方法时先添加select命令,再添加command命令,最后返回commands
getCommandsToReplicate
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.java
public class ClauDB extends RespServerContext implements DBServerContext {
//......
@Override
public ImmutableList<RedisToken> getCommandsToReplicate() {
return executeOn(Observable.<ImmutableList<RedisToken>>create(observable -> {
observable.onNext(getState().getCommandsToReplicate());
observable.onComplete();
})).blockingFirst();
}
//......
}
- getCommandsToReplicate方法执行的是getState().getCommandsToReplicate()
getCommandsToReplicate
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.java
public class DBServerState {
private static final int RDB_VERSION = 6;
private static final SafeString SLAVES = safeString("slaves");
private static final DatabaseKey SLAVES_KEY = safeKey("slaves");
private static final DatabaseKey SCRIPTS_KEY = safeKey("scripts");
private boolean master = true;
private final List<Database> databases = new ArrayList<>();
private final Database admin;
private final DatabaseFactory factory;
private final Queue<RedisToken> queue = new LinkedList<>();
public void append(RedisToken command) {
queue.offer(command);
}
//......
public ImmutableList<RedisToken> getCommandsToReplicate() {
ImmutableList<RedisToken> list = ImmutableList.from(queue);
queue.clear();
return list;
}
//......
}
- getCommandsToReplicate方法会根据queue创建ImmutableList,然后清空queue;而append方法会添加command到queue
executeCommand
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.java
public class ClauDB extends RespServerContext implements DBServerContext {
//......
protected RedisToken executeCommand(RespCommand command, Request request) {
if (!isReadOnly(request.getCommand())) {
try {
RedisToken response = command.execute(request);
replication(request);
notification(request);
return response;
} catch (RuntimeException e) {
LOGGER.error("error executing command: " + request, e);
return error("error executing command: " + request);
}
} else {
return error("READONLY You can't write against a read only slave");
}
}
private void replication(Request request) {
if (!isReadOnlyCommand(request.getCommand())) {
RedisToken array = requestToArray(request);
if (hasSlaves()) {
getState().append(array);
}
persistence.ifPresent(manager -> manager.append(array));
}
}
@Override
public void publish(String sourceKey, RedisToken message) {
Session session = getSession(sourceKey);
if (session != null) {
session.publish(message);
}
}
//......
}
- executeCommand方法除了执行command.execute,还会执行replication方法,它会在有slaves的条件将非readOnlyCommand追加到state;publish方法执行的是session.publish(message)传输给slave
小结
MasterReplication实现了Runnable接口,其start方法调度执行自身的runnable,每隔2秒执行一次;其run方法先执行createCommands方法,然后遍历slaves,然后遍历commands,执行server.publish(slave.toString(), command);createCommands先添加ping命令,然后再添加commandsToReplicate;commandsToReplicate方法遍历server.getCommandsToReplicate(),遇到array方法时先添加select命令,再添加command命令,最后返回commands
doc
- 利用宏避免发送确认邮件时忘记添加附件
- dateDiff在Objective-C中的实现
- 禁用Firefox自带的元素查看工具
- 容易被误解的overflow:hidden
- dedecms调用全站相关文章怎么设置
- dedecms自定义表单提交成功后提示信息修改和跳转链接修改
- dede:arclist orderby=weight dedecms列表页文章按权重排序无效问题
- Golang语言社区--Go语言基础第二节变量
- 如何让帝国CMS7.2搜索模板支持动态标签调用
- 数据视觉盛宴—数据可视化实践之美
- 使用Tensorflow对象检测在安卓手机上“寻找”皮卡丘
- 群用户通过微信小程序可以更好地协作了
- RNN入门与实践
- 群分享:关于Markdown,你可能想知道的
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法