Spark度量系统相关讲解
Spark的Metrics System的度量系统,有两个部分组成:source,sink,创建的时候需要制定instance。度量系统会周期的将source的指标数据被sink周期性的拉去,sink可以有很多。
Instance代表着使用度量系统的角色。在spark内部,目前master,worker,Executor,client driver,这些角色都会因为要去做监控而创建使用度量系统。目前,spark内部实现的instance有:master,worker,Executor,Driver,Applications。
Source指定定义了如何去收取度量指标。目前,已经存在以下两种source:
1.Spark内部的source,比如MasterSource,WorkerSource,ExecutorSource,
DAGSchedulerSource,BlockManagerSource,ApplicationSource。这些source会收集spark内部部件的状态。这些source都跟instance相关,在创建度量系统的时候会被加入。
2.公共的source,比如JVMSource,收集的是更加底层的状态,可以用配置文件配置并且是通过反射机制加载的。
Sink定义了度量指标数据输出的位置。同时可以共存很多sinks,指标数据会发给所有的sinks。
Source和sink的绑定
def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
registerSources()
registerSinks()
sinks.foreach(_.start)
}
指标配置的格式如下:
[instance].[sink|source].[name].[options] = xxxx
[instance]可以是master,worker,executor,driver,applications.配置了就意味着只有指定的instance由此属性。可以粗犷的用*代替instance name,这就意味着所有的instance都将由此属性。
[sink|source].代表着该属性是source还是sink。只能是二选一。
[name]指定sink或者source的名字。
[options]指定sink或者source的属性
具体例子如下:
## Examples
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
# Enable ConsoleSink for all instances by class name
#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
# Polling period for ConsoleSink
#*.sink.console.period=10
#*.sink.console.unit=seconds
# Master instance overlap polling period
#master.sink.console.period=15
#master.sink.console.unit=seconds
# Enable CsvSink for all instances
#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
# Polling period for CsvSink
#*.sink.csv.period=1
#*.sink.csv.unit=minutes
# Polling directory for CsvSink
#*.sink.csv.directory=/tmp/
# Worker instance overlap polling period
#worker.sink.csv.period=10
#worker.sink.csv.unit=minutes
# Enable Slf4jSink for all instances by class name
#*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink
# Polling period for Slf4JSink
#*.sink.slf4j.period=1
#*.sink.slf4j.unit=minutes
注意事项:
1,添加新的sink的时候,设置class option时需要是全名。
2,有些sink支持周期的拉去数据。最小拉去数据的周期是1秒钟。
3,有些特殊的属性支持通配符,例如:master.sink.console.period->*.sink.console.period
4,metrics.properties文件如果放在 ${SPARK_HOME}/conf目录下可以被自动加载
如果想自定义目录需要用-Dspark.metrics.conf=xxx,指定java属性配置的方式去指定。
5,MetricsServlet作为默认的sink,只支持,master,worker,client driver,可以通过发送http请求 /metrics/json,可以以json的格式获取所有已经注册的指标数据。
由于Spark生产中大部分运行于yarn上
Driver端的度量指标的请求方式
/proxy/application_1494227937369_0084/metrics/json
主要source源是:
StreamingSource,DAGSchedulerSource,BlockManagerSource,
ExecutorAllocationManagerSource
driver端的度量系统的初始化细节
在SparkContext里面
初始化度量系统
构建度量系统对象是在Sparkenv中做的
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
SparkContext只是引用了SparkEnv的对象
metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null
启动度量系统并且绑定ServletHandler
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
注册source
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
Executor端的Source:
ExecutorSource
Executor端度量系统的初始化机启动
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
构建ExecutorSource并注册
private val executorSource = new ExecutorSource(threadPool, executorId)
if (!isLocal) {
env.metricsSystem.registerSource(executorSource)
env.blockManager.initialize(conf.getAppId)
}
可以看到Executor端并没有绑定ServletHandler,故而无法通过http请求到度量指标。
- linux学习第八篇:文件或目录权限chmod,更改所有者和所属组chown,umask,隐藏权限lsattr_chattr
- linux学习第九篇:特殊权限set_uid,set_gid,stick_bit以及软连接文件,硬链接文件
- C#事件(event)的一个实例
- linux学习第十篇:find命令,文件名后缀
- linux学习第十一篇:linux和Windows互传文件,用户配置文件和密码配置文件,用户组管理以及用户管理
- linux学习第十二篇:usermod命令,用户密码管理,mkpasswd命令
- 一斤代码深入理解系列(三):微信小程序和服务器通信
- C#CreateGraphics方法的三种实现方式
- 一斤代码深入理解系列(四):微信小程序和服务器通信-WebSocket
- linux学习第十四篇:查看磁盘,文件大小命令:df,du;磁盘分区
- 二叉树的性质和常用操作代码集合
- linux学习第十五篇:磁盘格式化,磁盘挂载,手动增加swap空间
- 《Java程序设计基础》 第8章手记Part 2
- 备忘录模式
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法