Spark Opeartor的指标体系

时间:2022-07-22
本文章向大家介绍Spark Opeartor的指标体系,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

1 Overview

spark-on-k8s-operator,下文简称 Spark Operator, 背景知识就不介绍太多了,本文主要分享一下 Spark Operator 的指标系统是如何构建的,之后可以按照 Spark Operator 的方法,给自己创建的 Operator 配上指标系统。

2 Metrics

2.1 Spark Metrics

Spark Operator 目前提供几种自定义的指标,这里自定义的意思是以 Spark Application 这个自定义资源对象为监控对象,围绕 Spark Application 创建的一些监控指标,来让 Spark Operator 的维护者,更好的监控 Operator 中 CRD 对象的情况。

下面是目前 Spark Operator 的指标。自定义指标基本都在 sparkapp_metrics.go 里定义。

type sparkAppMetrics struct {
    ...
	sparkAppSubmitCount           *prometheus.CounterVec
	sparkAppSuccessCount          *prometheus.CounterVec
	sparkAppFailureCount          *prometheus.CounterVec
	sparkAppFailedSubmissionCount *prometheus.CounterVec
	sparkAppRunningCount          *util.PositiveGauge
	sparkAppSuccessExecutionTime  *prometheus.SummaryVec
	sparkAppFailureExecutionTime  *prometheus.SummaryVec
	sparkAppExecutorRunningCount  *util.PositiveGauge
	sparkAppExecutorFailureCount  *prometheus.CounterVec
	sparkAppExecutorSuccessCount  *prometheus.CounterVec
}

可以看到 Spark Operator 记录了,Spark App 的提交数、成功数、失败数、提交 失败数、当前运行数、运行成功的时间统计、运行失败的时间统计、运行的 Executor 数、失败的 Executor 数以及成功的 Executor 数。

newSparkAppMetrics new 实际就是去注册的意思。按照指标的类型,CounterVec 或者 GaugeVec 等,配置好 TYPEHELP,或者 LABEL 等。

熟悉 Prometheus 的同学应该知道,Counter, Gauge, Summary, Histogram 几种类型,但是我们从上面的 sparkAppMetrics 上还看到了 PositiveGauge

有两个比较特殊的指标。一个是 sparkAppRunningCountsparkAppExecutorRunningCount,因为他们都是 Gauge 类型的,但是是不会降低到0以下的,所以这里注册的类型,是自定义的 PostiveGauge

sparkAppRunningCount := util.NewPositiveGauge(util.CreateValidMetricNameLabel(prefix, "spark_app_running_count"), "Spark App Running Count via the Operator", validLabels)
sparkAppExecutorRunningCount := util.NewPositiveGauge(util.CreateValidMetricNameLabel(prefix, "spark_app_executor_running_count"), "Spark App Running Executor Count via the Operator", validLabels)

这个是 Spark Operator 自实现的一个只用于正数的 Gauge,因为大家都知道 Gauge 其实是可以亦正亦负的。这个指标收集一些会增长会减少,但是不会跌破0的类型。

type PositiveGauge struct {
	mux         sync.RWMutex
	name        string
	gaugeMetric *prometheus.GaugeVec
}

实现的原理很简单,就是一个读写锁 mux 和一个指标名 name,以及一个正常的可以亦正亦负的 gaugeMetrics。下面是其构造方法 NewPositiveGauge

func NewPositiveGauge(name string, description string, labels []string) *PositiveGauge {
	validLabels := make([]string, len(labels))
	for i, label := range labels {
		validLabels[i] = CreateValidMetricNameLabel("", label)
	}

	gauge := prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: name,
			Help: description,
		},
		validLabels,
	)

	return &PositiveGauge{
		gaugeMetric: gauge,
		name:        name,
	}
}

2.2 Workqueue Metrics

这里的 Workqueue Metrics 是指 client-go 库里的 workqueue 包里的 metrics.go。因为 Spark Operator 实现的 Controller 里,用到了 rate limiting workqueue 这个工作队列。

https://github.com/kubernetes/client-go/blob/master/util/workqueue/metrics.go

Spark Operator 里自定义的 WorkQueueMetrics 主要是用于暴露这个工作队列的指标,其实就是给原来的 workqueque 的指标加上一个 prefix,这样后面收集指标和使用指标时候会更方便。

WorkQueueMetrics 的指标的结构体。

type WorkQueueMetrics struct {
	prefix string
}

下面是 Workqueque Metrics 的几种类型。

// Depth Metric for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewDepthMetric(name string) workqueue.GaugeMetric {
    ...
}

// Adds Count Metrics for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewAddsMetric(name string) workqueue.CounterMetric {
    ...
}

// Latency Metric for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewLatencyMetric(name string) workqueue.SummaryMetric {
    ...
}

// WorkDuration Metric for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewWorkDurationMetric(name string) workqueue.SummaryMetric {
    ...
}

// Retry Metric for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewRetriesMetric(name string) workqueue.CounterMetric {
    ...
}

func (p *WorkQueueMetrics) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
    ...
}

func (p *WorkQueueMetrics) NewLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
    ...
}

2.3 指标初始化

Spark Metrics 和 Workqueue Metrics 两部分指标的初始化都在 InitializeMetrics 方法里。Spark Metrics 初始化,是普通的 Prometheus 指标收集初始化的方式,Workqueue Metrics 则是通过 Workqueue 的 Provider 来填充。

func InitializeMetrics(metricsConfig *MetricConfig) {
	// Start the metrics endpoint for Prometheus to scrape
	http.Handle(metricsConfig.MetricsEndpoint, promhttp.Handler())
	go http.ListenAndServe(fmt.Sprintf(":%s", metricsConfig.MetricsPort), nil)
	glog.Infof("Started Metrics server at localhost:%s%s", metricsConfig.MetricsPort, metricsConfig.MetricsEndpoint)

	workQueueMetrics := WorkQueueMetrics{prefix: metricsConfig.MetricsPrefix}
	workqueue.SetProvider(&workQueueMetrics)
}

2.4 其他

func CreateValidMetricNameLabel(prefix, name string) string {
	// "-" is not a valid character for prometheus metric names or labels.
	return strings.Replace(prefix+name, "-", "_", -1)
}

根据 Prometheus 的指引,Metric Name 除了可以用大小写字母和数字以为,还可以用下划线_,但是不能是中划线。CreateValidMetricNameLabel 方法是用来矫正指标名的,以防制造不符合规范的指标名,导致指标无法被 Prometheus 拉取。

The metric name specifies the general feature of a system that is measured (e.g. http_requests_total - the total number of HTTP requests received). It may contain ASCII letters and digits, as well as underscores and colons. It must match the regex [a-zA-Z_:][a-zA-Z0-9_:]*

func RegisterMetric(metric prometheus.Collector) {
	if err := prometheus.Register(metric); err != nil {
		// Ignore AlreadyRegisteredError.
		if _, ok := err.(prometheus.AlreadyRegisteredError); ok {
			return
		}
		glog.Errorf("failed to register metric: %v", err)
	}
}

构建指标体系,需要有一个注册 registry 的过程。传入的就是一个 metrics 类型,然后通过注册接口注册即可。

type MetricConfig struct {
	MetricsEndpoint string
	MetricsPort     string
	MetricsPrefix   string
	MetricsLabels   []string
}

MeticsConfig 是 Spark Operator 的指标配置信息类型,包括 Operator 应用暴露的指标 Endpoint,指标端口 Port,指标的前缀(可以用于快速过滤)以及指标的 Labels(注意是一个数组,意思是指标会被打上这个数组的里的名字作为 Label)。

func fetchGaugeValue(m *prometheus.GaugeVec, labels map[string]string) float64 {
	// Hack to get the current value of the metric to support PositiveGauge
	pb := &prometheusmodel.Metric{}

	m.With(labels).Write(pb)
	return pb.GetGauge().GetValue()
}

2.5 工作时的指标

很重要的,就是根据 Spark App 的 CRD 对象的状态来输出指标了,这是指标体系最重要的部分。这里看不懂,前面都白看。

func (sm *sparkAppMetrics) exportMetrics(oldApp, newApp *v1beta2.SparkApplication) {
	oldState := oldApp.Status.AppState.State
	newState := newApp.Status.AppState.State
	if newState != oldState {
		switch newState {
		case v1beta2.SubmittedState:
		    // sparkAppSubmitCount + 1
		case v1beta2.RunningState:
		    // sparkAppRunningCount + 1
		case v1beta2.SucceedingState:
		    // sparkAppSuccessCount + 1
   		    // sparkAppRunningCount - 1
		case v1beta2.FailingState:
		    // sparkAppFailureCount + 1
		    // sparkAppRunningCount - 1
		case v1beta2.FailedSubmissionState:
		    // sparkAppFailedSubmissionCount + 1
		}
	}

	// Potential Executor status updates
	// 不赘述了
	for executor, newExecState := range newApp.Status.ExecutorState {
		switch newExecState {
		case v1beta2.ExecutorRunningState:
		case v1beta2.ExecutorCompletedState:
		case v1beta2.ExecutorFailedState:
		}
	}
}

指标构建和初始化完成了,也配置好什么时候该输出指标的方法了,然后看看 exportMetrics 这个方法是在什么时候被调用的。

// updateStatusAndExportMetrics updates the status of the SparkApplication and export the metrics.
func (c *Controller) updateStatusAndExportMetrics(oldApp, newApp *v1beta2.SparkApplication) error {
	// Skip update if nothing changed.
	if equality.Semantic.DeepEqual(oldApp, newApp) {
		return nil
	}

	updatedApp, err := c.updateApplicationStatusWithRetries(oldApp, func(status *v1beta2.SparkApplicationStatus) {
		*status = newApp.Status
	}, c.k8sMinorVersion)

	// Export metrics if the update was successful.
	if err == nil && c.metrics != nil {
	    // 调用
		c.metrics.exportMetrics(oldApp, updatedApp)
	}

	return err
}

当 Spark App CRD 对象在对 status 字段进行更新的时候,除了更新 status 以外,还会调用 exportMetics 方法来输出指标。

显然 updateStatusAndExportMetrics 这个方法是 Controller 在同步 CRD 对象的时候调用的。

if appToUpdate != nil {
	glog.V(2).Infof("Trying to update SparkApplication %s/%s, from: [%v] to [%v]", app.Namespace, app.Name, app.Status, appToUpdate.Status)
	// 当 appToUpdate 不为 nil,证明需要去更新 CRD 对象的 status 了
	// 这个时候同时输出指标
	err = c.updateStatusAndExportMetrics(app, appToUpdate)
	if err != nil {
		glog.Errorf("failed to update SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
		return err
	}
}

3 Summary

给 Operator 加指标,总体来说不是非常难的事情,难的地方在于判断自己到底需要收集什么指标,做什么样的监控,如果需要自定义指标系统,可以参考 Spark Operator 的做法,自定义 CRD 层面的 Metrics,如果还需要监控工作队列,直接通过 client-go 的 workqueue 的接口去做即可。

下面是一份真实的 Spark Operator 输出的指标,供参考。

...
# HELP spark_app_executor_failure_count Spark App Failed Executor Count via the Operator
# TYPE spark_app_executor_failure_count counter
spark_app_executor_failure_count{project="Unknown"} 7
# HELP spark_app_executor_running_count Spark App Running Executor Count via the Operator
# TYPE spark_app_executor_running_count gauge
spark_app_executor_running_count{project="Unknown"} 7
spark_app_executor_running_count{project="demo"} 22
# HELP spark_app_executor_success_count Spark App Successful Executor Count via the Operator
# TYPE spark_app_executor_success_count counter
spark_app_executor_success_count{project="demo"} 65
# HELP spark_app_running_count Spark App Running Count via the Operator
# TYPE spark_app_running_count gauge
spark_app_running_count{project="demo"} 0
# HELP spark_app_submit_count Spark App Submits via the Operator
# TYPE spark_app_submit_count counter
spark_app_submit_count{project="demo"} 1
# HELP spark_app_success_count Spark App Success Count via the Operator
# TYPE spark_app_success_count counter
spark_app_success_count{project="demo"} 1
# HELP spark_app_success_execution_time_microseconds Spark App Successful Execution Runtime via the Operator
# TYPE spark_app_success_execution_time_microseconds summary
spark_app_success_execution_time_microseconds{project="demo",quantile="0.5"} NaN
spark_app_success_execution_time_microseconds{project="demo",quantile="0.9"} NaN
spark_app_success_execution_time_microseconds{project="demo",quantile="0.99"} NaN
spark_app_success_execution_time_microseconds_sum{project="demo"} 5.83e+08
spark_app_success_execution_time_microseconds_count{project="demo"} 1
# HELP spark_application_controller_adds Total number of adds handled by workqueue: spark-application-controller
# TYPE spark_application_controller_adds counter
spark_application_controller_adds 120
# HELP spark_application_controller_depth Current depth of workqueue: spark-application-controller
# TYPE spark_application_controller_depth gauge
spark_application_controller_depth 0
# HELP spark_application_controller_latency Latency for workqueue: spark-application-controller
# TYPE spark_application_controller_latency summary
spark_application_controller_latency{quantile="0.5"} NaN
spark_application_controller_latency{quantile="0.9"} NaN
spark_application_controller_latency{quantile="0.99"} NaN
spark_application_controller_latency_sum 6.150365e+06
spark_application_controller_latency_count 120
# HELP spark_application_controller_longest_running_processor_microseconds Longest running processor microseconds: spark-application-controller
# TYPE spark_application_controller_longest_running_processor_microseconds gauge
spark_application_controller_longest_running_processor_microseconds 0
# HELP spark_application_controller_retries Total number of retries handled by workqueue: spark-application-controller
# TYPE spark_application_controller_retries counter
spark_application_controller_retries 472
# HELP spark_application_controller_unfinished_work_seconds Unfinished work seconds: spark-application-controller
# TYPE spark_application_controller_unfinished_work_seconds gauge
spark_application_controller_unfinished_work_seconds 0
# HELP spark_application_controller_work_duration How long processing an item from workqueue spark-application-controller takes.
# TYPE spark_application_controller_work_duration summary
spark_application_controller_work_duration{quantile="0.5"} NaN
spark_application_controller_work_duration{quantile="0.9"} NaN
spark_application_controller_work_duration{quantile="0.99"} NaN
spark_application_controller_work_duration_sum 1.2046801e+07
spark_application_controller_work_duration_count 120