源码分析-分布式链路追踪:Skywalking存储插件能力-elasticsearch

时间:2022-07-23
本文章向大家介绍源码分析-分布式链路追踪:Skywalking存储插件能力-elasticsearch,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

如上为Skywalking的整体领域概念设计,基于领域模型设计,我们可以获取很多信息:

  • 存储插件化
  • 存储模块化
  • 存储能力多样性

整体源码结构如下:

存储能力主要包括:

  • elasticsearch
  • influxdb
  • jaeger
  • jdbc-hikaricp
  • zipkin

这里只是简单分析elasticsearch7存储的源码,也是非常概要的分析,为什么呢主要是想带着大家分析,让大家也具备源码分析的能力,并热爱分析各种框架的源码。

首先看storage-elasticsearch7-plugin目录下的resources/META-INF.services目录下的org.apache.skywalking.oap.server.library.module.ModuleProvider文件,这个就是模块化设计

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#

org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Provider

其次看StorageModuleElasticsearch7Provider

  • prepare()方法
@Override
public void prepare() throws ServiceNotProvidedException {
    if (!StringUtil.isEmpty(config.getNameSpace())) {
        //获取配置中心关于Elasticsearch7的配置-es的命名空间
        config.setNameSpace(config.getNameSpace().toLowerCase());
    }
    if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
        MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor(
                10, readableContents -> {
            final byte[] secretsFileContent = readableContents.get(0);
            if (secretsFileContent == null) {
                return;
            }
            Properties secrets = new Properties();
            secrets.load(new ByteArrayInputStream(secretsFileContent));
            config.setUser(secrets.getProperty("user", null));
            config.setPassword(secrets.getProperty("password", null));
            config.setTrustStorePass(secrets.getProperty("trustStorePass", null));

            if (elasticSearch7Client == null) {
                //In the startup process, we just need to change the username/password
            } else {
                // The client has connected, updates the config and connects again.
                elasticSearch7Client.setUser(config.getUser());
                elasticSearch7Client.setPassword(config.getPassword());
                elasticSearch7Client.setTrustStorePass(config.getTrustStorePass());
                elasticSearch7Client.connect();
            }
        }, config.getSecretsManagementFile(), config.getTrustStorePass());
        /**
         * By leveraging the sync update check feature when startup.
         */
        monitor.start();
    }

    //初始化客户端,包括es集群节点、es协议以及信任的存储路径
    elasticSearch7Client = new ElasticSearch7Client(
            config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
            .getTrustStorePass(), config.getUser(), config.getPassword(),
            indexNameConverters(config.getNameSpace())
    );

    //注册各种DAO客户端,完成基于DAO插件模块的设计的初始化
    this.registerServiceImplementation(
            IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(),
                    config.getFlushInterval(), config.getConcurrentRequests()
            ));
    this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client));
    this.registerServiceImplementation(
            IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearch7Client));
    this.registerServiceImplementation(
            INetworkAddressAliasDAO.class, new NetworkAddressAliasEsDAO(
                    elasticSearch7Client,
                    config.getResultWindowMaxSize()
            ));
    this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearch7Client));
    this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEs7DAO(elasticSearch7Client));
    this.registerServiceImplementation(
            ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize()));
    this.registerServiceImplementation(
            IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize()));
    this.registerServiceImplementation(
            IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client));
    this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEs7DAO(elasticSearch7Client));
    this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearch7Client));
    this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));

    this.registerServiceImplementation(
            IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(
                    elasticSearch7Client,
                    config.getProfileTaskQueryMaxSize()
            ));
    this.registerServiceImplementation(
            IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(
                    elasticSearch7Client,
                    config.getProfileTaskQueryMaxSize()
            ));
    this.registerServiceImplementation(
            IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEs7DAO(
                    elasticSearch7Client,
                    config.getProfileTaskQueryMaxSize()
            ));
    this.registerServiceImplementation(
            UITemplateManagementDAO.class, new UITemplateManagementEsDAO(elasticSearch7Client));
}
  • start()方法
@Override
public void start() throws ModuleStartException {
    MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
    HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
    //开启健康检查
    elasticSearch7Client.registerChecker(healthChecker);
    try {
        //开启连接
        elasticSearch7Client.connect();
        
        //完成es在OAP端的安装(因为要区分es6和es7),所以就做了这么一个模块
        StorageEs7Installer installer = new StorageEs7Installer(elasticSearch7Client, getManager(), config);
        getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
    } catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
        throw new ModuleStartException(e.getMessage(), e);
    }
}