ONOS集群选举分析

时间:2022-05-06
本文章向大家介绍ONOS集群选举分析,主要内容包括集群协议概述、ONOS 集群选举、创建一个选举器的流程、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。

首先简单介绍下自己,之前是做 floodlight 控制器开发的,鉴于 ODL 和 onos 的如火如荼的发展,如果不对了解点就感觉自己 OUT 了,因此忙里偷闲,看了点 onos的源码收获颇丰,不敢私藏,也算是抛砖引玉。

对于 onos,我认真读的也就是集群这块,也大概浏览了下其他模块的源码。onos中有些精巧的代码完全可以用于其他项目,比如,最短路径算法, floodligth的实现嵌入到了具体模块,而且不支持多路径, 而 onos提供了三种最短路径算法,而且原生支持多路径,而且模块化做的非常好。我也参考 onos 的部分设计,并且应用于公司项目中。此外,Java 8 的表达力比 Java 7 的表达力的提升在 onos 中体现的淋漓尽致,比如在有些功能相近的模块,floodlight 的实现比 onos 要冗余很多。

总之,onos 整体代码质量要远高于 floodligth。

打算写成一个系列。大体列下提纲:

  1. 集群选举
  2. onos 中 Raft 协议实现概论
  3. onos 中 gossip 协议的实现
  4. 集群基本原语支持,onos 支持分布式的 ConcurrentHashMap,AtomicCount,Set 等等
  5. 可以用于其他项目的设计,代码。

本篇主要分析 onos 集群选举的代码路径。

集群协议概述

集群选举, onos 用的 Raft 协议。至于为什么不用 poxos, 我不清楚, 但现在越来越多出现一个趋势,就是大家偏向于用 Raft 代替 Poxos。 原因就是 Raft比较简单。

这里说趋势, 是基于目前 Raft 算法实现和 poxos 协议实现的数量。另外, 我也发现 Harvard,Standford 和 CMU 已经在他们的分布式课程中将原来的 Paxos 替换为 Raft,原因可用参见这里, 而且 Raft 还有官网, 里面包含丰富的资源,而 Paxos 只有论文。所以, 总体趋势上看 Raft 已经渐渐变为主流。

基于 paxos 的实现,我们目前知道的就是 zookeeper, ceph 都实现了 paxos,而 zookeeper 实际并不是精确的 paxos 实现,而是经过修改的 ZAB 协议。最近,腾讯开源了他们的 paxos 实现 phxpaxos,因此,大量的分布式项目依赖 zookeeper 不足为奇。

而 Raft 协议,我大体了解 Raft 的官方网站 Raft 协议的实现情况, 发现基于 java 完整实现的只有copycat, jraft, jgroups-raft, RaftKVDatabase/JSimpleDB, C5 replicator。(其中 C++ 和 Go 的也都有 5 个)

  • Jraft : 缺少文档,
  • jsimpledb : 并不是只实现 Raft
  • C5 replicator : 实现了 Raft 协议
  • jgropu-raft : 实现了 Raft 协议

需要说明的是这些项目的 star 都很低,应该没有成熟到可以应用到生产中。相较于其他实现, copycat 还通过Jepsen 来测试 Raft 协议,而其他项目没有。 由此可见, 实现一个完整的可用的 Raft 目前来说还是非常有挑战的。

onos 选择 copycat 作为其 Raft 协议的实现, 从上面分析来说, copycat 的选择是没有问题的。

ONOS 集群选举

注: 本文基于 onos 1.6 分支来进行分析。

ONOS 对集群的选举暴露出了一组接口,如下所示。

    public interface LeaderElector extends DistributedPrimitive {
        Leadership run(String topic, NodeId nodeId);
        void withdraw(String topic);
        boolean anoint(String topic, NodeId nodeId);
        boolean promote(String topic, NodeId nodeId);
        void evict(NodeId nodeId);
        Leadership getLeadership(String topic);
        Map<String, Leadership> getLeaderships();
        void addChangeListener(Consumer<Change<Leadership>> consumer);
        void removeChangeListener(Consumer<Change<Leadership>> consumer); }

即 run, withdraw, anoint, promote, evict, 关于它们的用法, 文档解释得非常清楚,这里就直接搬运过来。

    Distributed mutual exclusion primitive.
 
     AsyncLeaderElector facilitates mutually exclusive access to a shared
    resource by various cluster members.
 
    Each resource is identified by a unique topic name and members register
    their desire to access the resource by calling the AsyncLeaderElector's
    run method. Access is grated on a FIFO basis.
 
    An instance can unregister itself from the leadership election by calling
    AsyncLeaderElector's withdraw method.
 
     If an instance currently holding the resource dies then the next instance
    waiting to be leader (in FIFO order) will be automatically granted access
    to the resource.
 
    One can register listeners to be notified when a leadership change occurs.
    The Listeners are notified via a Leadership Change change subject.
 
    Additionally, AsyncLeaderElector provides methods to query the current
    state of leadership for topics. /

创建一个选举器的流程

在 NewDistributedLeadershipStore.java 文件中有这样一段代码

    LeaderElector leaderElector
    StorageService storageService;
 
    leaderElector = storageService.leaderElectorBuilder()
              .withName("onos-leadership-elections")
              .build()
              .asLeaderElector();

这里创建了一个 leaderElector, 后面代码都是对该段代码的注解。StorageManager 实现了 StorageService 接口。

    StorageManager.leaderElectorBuilder()
              .withName("onos-leadership-elections")
              .build()
              .asLeaderElector();

StorageManager.leaderElectorBuilder() 调用了 new DefaultLeaderElectorBuilder(federatedPrimitiveCreator)

    public class StorageManager implements StorageService, StorageAdminService {
 
        protected PartitionService partitionService;
 
        private DistributedPrimitiveCreator federatedPrimitiveCreator;
 
        @Activate
        public void activate() {
            Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
            partitionService.getAllPartitionIds().stream()
                .filter(id -> !id.equals(PartitionId.from(0)))
                .forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
            federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap);
            transactions = this.<TransactionId, Transaction.State>consistentMapBuilder()
                        .withName("onos-transactions")
                        .withSerializer(Serializer.using(KryoNamespaces.API,
                                Transaction.class,
                                Transaction.State.class))
                        .buildAsyncMap();
            transactionCoordinator = new TransactionCoordinator(transactions);
            log.info("Started");
        }
 
        @Override
        public LeaderElectorBuilder leaderElectorBuilder() {
            checkPermission(STORAGE_WRITE);
            return new DefaultLeaderElectorBuilder(federatedPrimitiveCreator);
        }
        ....
    }

此外,DefaultLeaderElectorBuilder 继承了 LeaderElectorBuilder:

    LeaderElectorBuilder leaderElectorBuilder = new DefaultLeaderElectorBuilder(federatedPrimitiveCreator)
 
    leaderElectorBuilder
              .withName("onos-leadership-elections")
              .build()
              .asLeaderElector();

leaderElectorBuilder.withName(“onos-leadership-elections”).build() 返回 AsyncLeaderElector 类型对象 asyncLeaderElector

    asyncLeaderElector.asLeaderElector()

AsyncLeaderElector 的 asLeaderElector() 调用了它的 asLeaderElector(long timeoutMillis) 方法,asyncLeaderElector.asLeaderElector(Long.MAX_VALUE) //任务超时时间为 Long.MAX_VALUE。AsyncLeaderElector 的 asLeaderElector(Long.MAX_VALUE) 方法调用了 DefaultLeaderElector 构造函数 DefaultLeaderElector(AsyncLeaderElector asyncElector, long operationTimeoutMillis)。

new DefaultLeaderElector(this, timeoutMillis),其中 this 为 AsyncLeaderElector 的实例化对象 asyncLeaderElector。

注: DefaultLeaderElector 将 LeaderElector 的所有方法通过 CompletableFuture 变为异步操作。

其中 DefaultLeaderElector 实现了 LeaderElector,而 DefaultLeaderElector 实现所有 LeaderElector 的方法依赖构造函数 AsyncLeaderElector, 因此, 问题回到了 leaderElectorBuilder.withName(“onos-leadership-elections”).build() 实际实例化的对象。即 new DefaultLeaderElectorBuilder(federatedPrimitiveCreator).withName(“onos-leadership-elections”).build() 到底做了什么, 其中

    public class DefaultLeaderElectorBuilder extends LeaderElectorBuilder {
        public DefaultLeaderElectorBuilder(DistributedPrimitiveCreator primitiveCreator) {
            this.primitiveCreator = primitiveCreator;
        }
 
        public AsyncLeaderElector build() {
            return primitiveCreator.newAsyncLeaderElector(name());
        }
    }

因此, 决定于 new FederatedDistributedPrimitiveCreator(partitionMap).newAsyncLeaderElector(“onos-leadership-elections”)

    public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiveCreator {
        private final TreeMap<PartitionId, DistributedPrimitiveCreator> members;
        private final List<PartitionId> sortedMemberPartitionIds;
 
        public FederatedDistributedPrimitiveCreator(Map<PartitionId, DistributedPrimitiveCreator> members) {
            this.members = Maps.newTreeMap();
            this.members.putAll(checkNotNull(members));
            this.sortedMemberPartitionIds = Lists.newArrayList(members.keySet());
        }
 
        @Override
        public AsyncLeaderElector newAsyncLeaderElector(String name) {
            checkNotNull(name);
            Map<PartitionId, AsyncLeaderElector> leaderElectors =
                    Maps.transformValues(members,
                                         partition -> partition.newAsyncLeaderElector(name));
            Hasher<String> hasher = topic -> {
                int hashCode = Hashing.sha256().hashString(topic, Charsets.UTF_8).asInt();
                return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
            };
            return new PartitionedAsyncLeaderElector(name, leaderElectors, hasher);
        }
    }

其中 PartitionedAsyncLeaderElector 实现如下

    public class PartitionedAsyncLeaderElector implements AsyncLeaderElector {
 
        private final String name;
        private final TreeMap<PartitionId, AsyncLeaderElector> partitions = Maps.newTreeMap();
        private final Hasher<String> topicHasher;
 
        public PartitionedAsyncLeaderElector(String name,
                Map<PartitionId, AsyncLeaderElector> partitions,
                Hasher<String> topicHasher) {
            this.name = name;
            this.partitions.putAll(checkNotNull(partitions));
            this.topicHasher = checkNotNull(topicHasher);
        }
 
        @Override
        public String name() {
            return name;
        }
 
        @Override
        public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
             return getLeaderElector(topic).run(topic, nodeId);
        }
 
        @Override
        public CompletableFuture<Void> withdraw(String topic) {
            return getLeaderElector(topic).withdraw(topic);
        }
 
        @Override
        public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
            return getLeaderElector(topic).anoint(topic, nodeId);
        }
 
        @Override
        public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
            return getLeaderElector(topic).promote(topic, nodeId);
        }
 
        @Override
        public CompletableFuture<Void> evict(NodeId nodeId) {
            return CompletableFuture.allOf(getLeaderElectors().stream()
                                                            .map(le -> le.evict(nodeId))
                                                            .toArray(CompletableFuture[]::new));
        }
 
        @Override
        public CompletableFuture<Leadership> getLeadership(String topic) {
            return getLeaderElector(topic).getLeadership(topic);
        }
 
        @Override
        public CompletableFuture<Map<String, Leadership>> getLeaderships() {
            Map<String, Leadership> leaderships = Maps.newConcurrentMap();
            return CompletableFuture.allOf(getLeaderElectors().stream()
                                        .map(le -> le.getLeaderships().thenAccept(m -> leaderships.putAll(m)))
                                        .toArray(CompletableFuture[]::new))
                                .thenApply(v -> leaderships);
        }
 
        @Override
        public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> listener) {
            return CompletableFuture.allOf(getLeaderElectors().stream()
                                        .map(map -> map.addChangeListener(listener))
                                        .toArray(CompletableFuture[]::new));
        }
 
        @Override
        public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> listener) {
            return CompletableFuture.allOf(getLeaderElectors().stream()
                                                          .map(map -> map.removeChangeListener(listener))
                                                          .toArray(CompletableFuture[]::new));
        }
 
        /**
         * Returns the leaderElector (partition) to which the specified topic maps.
         * @param topic topic name
         * @return AsyncLeaderElector to which topic maps
         */
        private AsyncLeaderElector getLeaderElector(String topic) {
            return partitions.get(topicHasher.hash(topic));
        }
    }

因此, 一个 LeaderElector 实际调用的是实现了 AsyncLeaderElector 接口的 PartitionedAsyncLeaderElector,至此, 一个选举器实现貌似已经完成了。 当你准备研究 onos 是如何实现选举过程时,看看 withdraw, anoint, promote 的实现, 你心中一定是”万马奔腾”的。

那么, 下面我们就继续看看选举过程的具体方法是如何实现的, 实现细节藏在哪里。对于 AsyncLeaderElector 定义的所有接口, 都通过 getLeaderElector(String topic) 来实现。

那 partitions 到底包含什么? 由上面 StorageManager 分析知道, partitions 的实参是 partitionMap。而 partitionMap 又由 PartitionService partitionService 来提供, PartitionManager 实现了接口 PartitionService。

    public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
        implements PartitionService, PartitionAdminService {
 
        protected ClusterMetadataService metadataService;
 
        private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
        private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
 
        @Activate
        public void activate() {
            currentClusterMetadata.set(metadataService.getClusterMetadata());
            currentClusterMetadata.get()
                           .getPartitions()
                           .stream()
                           .filter(partition -> !partition.getId().equals(PartitionId.from(0))) // exclude p0
                           .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
                                   messagingService,
                                   clusterService,
                                   CatalystSerializers.getSerializer(),
                                   new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
        }
 
        @Override
        public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
            checkPermission(PARTITION_READ);
            return partitions.get(partitionId).client();
        }
 
        @Override
        public Set<PartitionId> getAllPartitionIds() {
            checkPermission(PARTITION_READ);
            return partitions.keySet();
        }
    }

这样说来, partitions.get(topicHasher.hash(topic)),实际对象为 toragePartition 了。

    public class StoragePartition implements Managed<StoragePartition> {
 
        private final MessagingService messagingService;
        private final MessagingService messagingService;
        private final ClusterService clusterService;
        private final File logFolder;
        private Partition partition;
        private NodeId localNodeId;
        private StoragePartitionServer server;
        private StoragePartitionClient client;
 
        public StoragePartition(Partition partition,
                MessagingService messagingService,
                ClusterService clusterService,
                Serializer serializer,
                File logFolder) {
            this.partition = partition;
            this.messagingService = messagingService;
            this.clusterService = clusterService;
            this.localNodeId = clusterService.getLocalNode().id();
            this.serializer = serializer;
            this.logFolder = logFolder;
        }
 
        /**
         * Returns the partition client instance.
         * @return client
         */
        public StoragePartitionClient client() {
            return client;
        }
 
        @Override
        public CompletableFuture<Void> open() {
            if (partition.getMembers().contains(localNodeId)) {
                openServer();
            }
            return openClient().thenAccept(v -> isOpened.set(true))
                   .thenApply(v -> null);
        }
 
        private CompletableFuture<StoragePartitionClient> openClient() {
            client = new StoragePartitionClient(this,
                    serializer,
                    new CopycatTransport(CopycatTransport.Mode.CLIENT,
                                         partition.getId(),
                                         messagingService));
            return client.open().thenApply(v -> client);
        }
    }

参考 StorageManager activate() 方法可知, partitionMap 的 value 为 StoragePartition 的 client()方法返回值,由上 StoragePartition 的 client() 和 openClient 可知 client() 返回的实际是StoragePartitionClient, 那 StoragePartitionClient 又是什么?

    public class StoragePartitionClient implements DistributedPrimitiveCreator, Managed<StoragePartitionClient> {
 
        private final StoragePartition partition;
        private final Transport transport;
        private final io.atomix.catalyst.serializer.Serializer serializer;
 
        public StoragePartitionClient(StoragePartition partition,
                io.atomix.catalyst.serializer.Serializer serializer,
                Transport transport) {
            this.partition = partition;
            this.serializer = serializer;
            this.transport = transport;
        }
 
        @Override
        public CompletableFuture<Void> open() {
            if (client != null && client.isOpen()) {
                return CompletableFuture.completedFuture(null);
            }
            synchronized (StoragePartitionClient.this) {
                copycatClient = new CopycatClient(partition.getMemberAddresses(),
                                                 transport,
                                                 serializer.clone(),
                                                 StoragePartition.RESOURCE_TYPES);
              copycatClient.onStateChange(state -> log.debug("Partition {} client state"
                        + " changed to {}", partition.getId(), state));
                client = new AtomixClient(new ResourceClient(copycatClient));
            }
            return client.open().whenComplete((r, e) -> {
                if (e == null) {
                    log.info("Successfully started client for partition {}", partition.getId());
                } else {
                    log.info("Failed to start client for partition {}", partition.getId(), e);
                }
            }).thenApply(v -> null);
        }
 
        @Override
        public AsyncLeaderElector newAsyncLeaderElector(String name) {
            return client.getResource(name, AtomixLeaderElector.class).join();
        }
 
    }

至此, 终于明白 partitionMap 的 value 为 StoragePartitionClient。

由 FederatedDistributedPrimitiveCreator 的 newAsyncLeaderElector 得知, LeaderElectors 为partitionMap 的 value 的 newAsyncLeaderElector(name) 返回值, 即 StoragePartitionClient的 newAsyncLeaderElector(name) 方法。

由 newAsyncLeaderElector() 及 open() 可知, client.getResource(name, AtomixLeaderElector.class).join() 最终 AsyncLeaderElector 的实例化是 AtomixLeaderElector。

    public abstract class Atomix implements ResourceManager<Atomix> {
 
        protected Atomix(ResourceClient client) {
          this.client = Assert.notNull(client, "client");
        }
 
        @Override
        public <T extends Resource> CompletableFuture<T> getResource(String key, Class<? super T> type) {
          Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length");
          return client.getResource(key, type, new Resource.Config(), new Resource.Options());
        }
    }
 
 
    public class AtomixClient extends Atomix {
    }

因此, client.getResource(name, AtomixLeaderElector.class).join() 实际为 ResourceClient(copycatClient).getResource(name, AtomixLeaderElector.class, new Resource.Config(), new Resource.Options()).join()。

    public class ResourceClient implements ResourceManager<ResourceClient> {
        /**
         * @throws NullPointerException if {@code client} or {@code registry} are null
         */
        public ResourceClient(CopycatClient client) {
            this.client = Assert.notNull(client, "client");
        }
 
        @Override
        @SuppressWarnings("unchecked")
        public <T extends Resource> CompletableFuture<T> getResource(String key, Class<? super T> type, Resource.Config config, Resource.Options options) {
            return this.<T>getResource(key, type((Class<? extends Resource<?>>) type), config, options);
        }
 
        @Override
        @SuppressWarnings("unchecked")
        public synchronized <T extends Resource> CompletableFuture<T> getResource(String key, ResourceType type, Resource.Config config, Resource.Options options) {
            Assert.notNull(key, "key");
            Assert.notNull(type, "type");
            Assert.notNull(config, "config");
            Assert.notNull(options, "options");
            T resource;
 
            // Determine whether a singleton instance of the given resource key already exists.
            Resource<?> check = instances.get(key);
            if (check == null) {
                ResourceInstance instance = new ResourceInstance(key, type, config, this::close);
                InstanceClient client = new InstanceClient(instance, this.client);
                try {
                    check = type.factory().newInstance().createInstance(client, options);
                    instances.put(key, check);
                } catch (InstantiationException | IllegalAccessException e) {
                    return Futures.exceptionalFuture(e);
                }
            }
 
            // Ensure the existing singleton instance type matches the requested instance type. If the instance
            // was created new, this condition will always pass. If there was another instance created of a
            // different type, an exception will be returned without having to make a request to the cluster.
            if (check.type().id() != type.id()) {
                return Futures.exceptionalFuture(new IllegalArgumentException("inconsistent resource type: " + type));
            }
 
            resource = (T) check;
 
            // Ensure if a singleton instance is already being created, the existing open future is returned.
            CompletableFuture<T> future = futures.get(key);
            if (future == null) {
                future = resource.open();
                futures.put(key, future);
            }
            return future;
        }
    }

这里不再过多解释, 总之, 最后为

AtomixLeaderElector atomix = AtomixLeaderElectorFactory.createInstance().open().join()
 
    public class AtomixLeaderElectorFactory implements ResourceFactory<AtomixLeaderElector> {
        @Override
        public AtomixLeaderElector createInstance(CopycatClient client, Properties options) {
            return new AtomixLeaderElector(client, options);
        }
    }
 
    public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
        implements AsyncLeaderElector {
 
        public AtomixLeaderElector(CopycatClient client, Properties properties) {
            super(client, properties);
        }
 
        @Override
        public CompletableFuture<AtomixLeaderElector> open() {
            return super.open().thenApply(result -> {
                client.onEvent(CHANGE_SUBJECT, this::handleEvent);
                return result;
            });
        }
    }

终于 withdraw, anoint, promote 的实现浮出水面, 即 AtomixLeaderElectorCommands 来实现,而实现 io.atomix.copycat.Command, 到此远远没有完, 因为 Command 仅仅是个接口。

至此,我们基本可以确定, onos 并没有实现 Raft 协议,而是通过第三方库 atomix 下的 copycat 实现了 Raft协议。关于具体实现,下篇见分晓。