源码分析-分布式链路追踪:Skywalking存储插件能力-elasticsearch
时间:2022-07-23
本文章向大家介绍源码分析-分布式链路追踪:Skywalking存储插件能力-elasticsearch,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
如上为Skywalking的整体领域概念设计,基于领域模型设计,我们可以获取很多信息:
- 存储插件化
- 存储模块化
- 存储能力多样性
整体源码结构如下:
存储能力主要包括:
- elasticsearch
- influxdb
- jaeger
- jdbc-hikaricp
- zipkin
这里只是简单分析elasticsearch7存储的源码,也是非常概要的分析,为什么呢主要是想带着大家分析,让大家也具备源码分析的能力,并热爱分析各种框架的源码。
首先看storage-elasticsearch7-plugin目录下的resources/META-INF.services目录下的org.apache.skywalking.oap.server.library.module.ModuleProvider文件,这个就是模块化设计
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Provider
其次看StorageModuleElasticsearch7Provider
- prepare()方法
@Override
public void prepare() throws ServiceNotProvidedException {
if (!StringUtil.isEmpty(config.getNameSpace())) {
//获取配置中心关于Elasticsearch7的配置-es的命名空间
config.setNameSpace(config.getNameSpace().toLowerCase());
}
if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor(
10, readableContents -> {
final byte[] secretsFileContent = readableContents.get(0);
if (secretsFileContent == null) {
return;
}
Properties secrets = new Properties();
secrets.load(new ByteArrayInputStream(secretsFileContent));
config.setUser(secrets.getProperty("user", null));
config.setPassword(secrets.getProperty("password", null));
config.setTrustStorePass(secrets.getProperty("trustStorePass", null));
if (elasticSearch7Client == null) {
//In the startup process, we just need to change the username/password
} else {
// The client has connected, updates the config and connects again.
elasticSearch7Client.setUser(config.getUser());
elasticSearch7Client.setPassword(config.getPassword());
elasticSearch7Client.setTrustStorePass(config.getTrustStorePass());
elasticSearch7Client.connect();
}
}, config.getSecretsManagementFile(), config.getTrustStorePass());
/**
* By leveraging the sync update check feature when startup.
*/
monitor.start();
}
//初始化客户端,包括es集群节点、es协议以及信任的存储路径
elasticSearch7Client = new ElasticSearch7Client(
config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
.getTrustStorePass(), config.getUser(), config.getPassword(),
indexNameConverters(config.getNameSpace())
);
//注册各种DAO客户端,完成基于DAO插件模块的设计的初始化
this.registerServiceImplementation(
IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(),
config.getFlushInterval(), config.getConcurrentRequests()
));
this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearch7Client));
this.registerServiceImplementation(
INetworkAddressAliasDAO.class, new NetworkAddressAliasEsDAO(
elasticSearch7Client,
config.getResultWindowMaxSize()
));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearch7Client));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(
ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize()));
this.registerServiceImplementation(
IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(
IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearch7Client));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(
IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(
elasticSearch7Client,
config.getProfileTaskQueryMaxSize()
));
this.registerServiceImplementation(
IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(
elasticSearch7Client,
config.getProfileTaskQueryMaxSize()
));
this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEs7DAO(
elasticSearch7Client,
config.getProfileTaskQueryMaxSize()
));
this.registerServiceImplementation(
UITemplateManagementDAO.class, new UITemplateManagementEsDAO(elasticSearch7Client));
}
- start()方法
@Override
public void start() throws ModuleStartException {
MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
//开启健康检查
elasticSearch7Client.registerChecker(healthChecker);
try {
//开启连接
elasticSearch7Client.connect();
//完成es在OAP端的安装(因为要区分es6和es7),所以就做了这么一个模块
StorageEs7Installer installer = new StorageEs7Installer(elasticSearch7Client, getManager(), config);
getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
- 类属性的延迟计算
- 一步一步学lucene——(第三步:索引篇)
- 在Python应用中使用MongoDB
- Python检查xpath和csspath表达式是否合法
- 一步一步学lucene——(第四步:搜索篇)
- Python爬虫代理IP池
- SSDB图形界面管理工具:phpssdbadmin安装部署
- [Go 语言社区] 初始化内存数据--游戏列表数据
- SSDB安装配置记录
- Python标准库笔记(3) — datetime模块
- Django 1.10中文文档-第一个应用Part4-表单和通用视图
- Python标准库笔记(2) — re模块
- Python爬虫—破解JS加密的Cookie
- Go语言中json转成map结构
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- VBA解压缩ZIP文件11——存在问题
- 算法集锦(14)|图像识别| 图像识别算法的罗夏测试
- CenterNet骨干网络之hourglass
- 语音识别中的声学特征提取:梅尔频率倒谱系数MFCC | 老炮儿改名PPLOVELL | 5th
- 基于Apriori的数据关联分析 | 工业数据分析 | 冰水数据智能专题 | 4th
- 基于FP树的频繁项挖掘 | 工业数据分析 | 冰水数据智能 | 5th
- ICCV2019 高通Data-Free Quantization论文解读
- VBA解压缩ZIP文件10——解压-动态Huffman
- 海思NNIE之PFPLD训练与量化
- [译] 用 Truffle 插件自动在Etherscan上验证合约代码
- 二层网络上的以太坊智能合约: Optimistic Rollup
- 基于决策树的工业数据分类——数据智能
- Kestrel的ListenAnyIP和ListenLocalhost的区别
- 【为宏正名】什么?我忘了去上“数学必修课”!
- 第6章 Jenkins系统权限划分与授权管理