聊聊claudb的MasterReplication

时间:2022-07-23
本文章向大家介绍聊聊claudb的MasterReplication,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

本文主要研究一下claudb的MasterReplication

MasterReplication

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/MasterReplication.java

public class MasterReplication implements Runnable {
​
  private static final Logger LOGGER = LoggerFactory.getLogger(MasterReplication.class);
​
  private static final String SELECT_COMMAND = "SELECT";
  private static final String PING_COMMAND = "PING";
  private static final int TASK_DELAY = 2;
​
  private final DBServerContext server;
  private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
​
  public MasterReplication(DBServerContext server) {
    this.server = server;
  }
​
  public void start() {
    executor.scheduleWithFixedDelay(this, TASK_DELAY, TASK_DELAY, TimeUnit.SECONDS);
  }
​
  public void stop() {
    executor.shutdown();
  }
​
  public void addSlave(String id) {
    getServerState().addSlave(id);
    LOGGER.info("new slave: {}", id);
  }
​
  public void removeSlave(String id) {
    getServerState().removeSlave(id);
    LOGGER.info("slave revomed: {}", id);
  }
​
  @Override
  public void run() {
    List<RedisToken> commands = createCommands();
​
    for (SafeString slave : getServerState().getSlaves()) {
      for (RedisToken command : commands) {
        server.publish(slave.toString(), command);
      }
    }
  }
​
  private List<RedisToken> createCommands() {
    List<RedisToken> commands = new LinkedList<>();
    commands.add(pingCommand());
    commands.addAll(commandsToReplicate());
    return commands;
  }
​
  private List<RedisToken> commandsToReplicate() {
    List<RedisToken> commands = new LinkedList<>();
​
    for (RedisToken command : server.getCommandsToReplicate()) {
      command.accept(new AbstractRedisTokenVisitor<Void>() {
        @Override
        public Void array(ArrayRedisToken token) {
          commands.add(selectCommand(token));
          commands.add(command(token));
          return null;
        }
      });
    }
    return commands;
  }
​
  private RedisToken selectCommand(ArrayRedisToken token) {
    return array(string(SELECT_COMMAND),
        token.getValue().stream().findFirst().orElse(string("0")));
  }
​
  private RedisToken pingCommand() {
    return array(string(PING_COMMAND));
  }
​
  private RedisToken command(ArrayRedisToken token) {
    return array(token.getValue().stream().skip(1).collect(toList()));
  }
​
  private DBServerState getServerState() {
    return serverState().getOrElseThrow(() -> new IllegalStateException("missing server state"));
  }
​
  private Option<DBServerState> serverState() {
    return server.getValue("state");
  }
}
  • MasterReplication实现了Runnable接口,其start方法调度执行自身的runnable,每隔2秒执行一次;其run方法先执行createCommands方法,然后遍历slaves,然后遍历commands,执行server.publish(slave.toString(), command);createCommands先添加ping命令,然后再添加commandsToReplicate;commandsToReplicate方法遍历server.getCommandsToReplicate(),遇到array方法时先添加select命令,再添加command命令,最后返回commands

getCommandsToReplicate

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.java

public class ClauDB extends RespServerContext implements DBServerContext {
​
    //......
​
  @Override
  public ImmutableList<RedisToken> getCommandsToReplicate() {
    return executeOn(Observable.<ImmutableList<RedisToken>>create(observable -> {
      observable.onNext(getState().getCommandsToReplicate());
      observable.onComplete();
    })).blockingFirst();
  }
​
    //......
​
}
  • getCommandsToReplicate方法执行的是getState().getCommandsToReplicate()

getCommandsToReplicate

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.java

public class DBServerState {
​
  private static final int RDB_VERSION = 6;
​
  private static final SafeString SLAVES = safeString("slaves");
  private static final DatabaseKey SLAVES_KEY = safeKey("slaves");
  private static final DatabaseKey SCRIPTS_KEY = safeKey("scripts");
​
  private boolean master = true;
​
  private final List<Database> databases = new ArrayList<>();
  private final Database admin;
  private final DatabaseFactory factory;
​
  private final Queue<RedisToken> queue = new LinkedList<>();
​
  public void append(RedisToken command) {
    queue.offer(command);
  }
​
    //......
​
  public ImmutableList<RedisToken> getCommandsToReplicate() {
    ImmutableList<RedisToken> list = ImmutableList.from(queue);
    queue.clear();
    return list;
  }
​
    //......
​
}
  • getCommandsToReplicate方法会根据queue创建ImmutableList,然后清空queue;而append方法会添加command到queue

executeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.java

public class ClauDB extends RespServerContext implements DBServerContext {
​
    //......
​
  protected RedisToken executeCommand(RespCommand command, Request request) {
    if (!isReadOnly(request.getCommand())) {
      try {
        RedisToken response = command.execute(request);
        replication(request);
        notification(request);
        return response;
      } catch (RuntimeException e) {
        LOGGER.error("error executing command: " + request, e);
        return error("error executing command: " + request);
      }
    } else {
      return error("READONLY You can't write against a read only slave");
    }
  }
​
  private void replication(Request request) {
    if (!isReadOnlyCommand(request.getCommand())) {
      RedisToken array = requestToArray(request);
      if (hasSlaves()) {
        getState().append(array);
      }
      persistence.ifPresent(manager -> manager.append(array));
    }
  }
​
  @Override
  public void publish(String sourceKey, RedisToken message) {
    Session session = getSession(sourceKey);
    if (session != null) {
      session.publish(message);
    }
  }
​
    //......
​
}
  • executeCommand方法除了执行command.execute,还会执行replication方法,它会在有slaves的条件将非readOnlyCommand追加到state;publish方法执行的是session.publish(message)传输给slave

小结

MasterReplication实现了Runnable接口,其start方法调度执行自身的runnable,每隔2秒执行一次;其run方法先执行createCommands方法,然后遍历slaves,然后遍历commands,执行server.publish(slave.toString(), command);createCommands先添加ping命令,然后再添加commandsToReplicate;commandsToReplicate方法遍历server.getCommandsToReplicate(),遇到array方法时先添加select命令,再添加command命令,最后返回commands

doc