kafka客户端指标上报Prometheus方案(已开源)

时间:2022-07-25
本文章向大家介绍kafka客户端指标上报Prometheus方案(已开源),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

已开源 https://github.com/pierre94/kafka-client-prometheus-collector ,这里简单介绍下我的实现思路

一、背景

在我们场景下,需要将海量数据透传到我们的Kafka集群,这时候我们常常会遇到这些问题:

  • 我们数据压缩率是多少?后端需要多少资源来支持?
  • 我们kafka客户端的负载是怎么样的?能不能承受更高的压力?
  • 如何对kafka客户端做生产端调优?怎么提升我们的性能?

在过去我们会根据Kafka官方文档以及他人的经验总结,结合基础监控指标(如CPU、内存、网络等)可以大致总结出一个基础的评估方法。

在实际业务场景下,我们发现这种不具备可观测性的评估方法不能很好的满足我们的需求,所以我们尝试寻找一种完善的Kafka客户端内部指标的采集上报方案。

大数据领域可观测性建设可以系统学习下Apache Calcite Committer forward的文章

相比Kafka的服务端领域,我们发现开源社区普遍不太重视客户端的可观测性建设,没有一个特别完善好用的轮子。所以我们借鉴一些开源组件的思路,实现了这个小巧简单的lib来帮助开发者将kafka客户端的指标上报到Prometheus。

二、功能设计与效果展示

1、功能模块

已具备如下基础功能:

  • 支持HTTP Export
  • 支持PushGateway(含pushService)
  • 支持Producer(含Dashboard)
  • 支持Consumer(仅支持collector)

2、设计实现

image.png

实现逻辑也比较清晰,使用Kafka客户端Metric相关API获取到metric值,再按照我们对指标的理解翻译成Prometheus的指标值,构建Collector。

受限于kafka和Prometheus的限制,翻译模块的实现还不是很优雅。

3、效果展示

image.png

(提供Producer的Grafana DashBoard)

4、项目总结

与业界方案相比,我们具备如下优势:

  • 开箱即用,周边完善。与Prometheus完美结合,
  • 覆盖官方客户端的全部指标,后续结合运营经验提供黑白名单参考
  • 同时支持HTTP ExportPushGateway 的推拉模型
  • 支持ProducerConsumer2种客户端类型,同时支持多个client实例
  • 方便地集成到业务代码里,不需要额外部署采集组件
  • 开箱即用,周边完善。已有相对完善的Grafana DashBoard

三、接入使用

目前还没有传到公共仓库,仅供参考

1、接入方法

见: https://github.com/pierre94/kafka-client-prometheus-collector

2、使用方法

目前支持HTTP ExportPushGateway2种方法将指标接入到Prometheus

2.1、HTTP Export

Producer<Integer,String> producer = new KafkaProducer<Integer, String>(props);
new KafkaClientMetricCollector(Collections.singletonList(producer)).register();
HTTPServer server = new HTTPServer(8033);
curl http://{ip}:8033/metrics

2.2、 PushGateway

2.2.1 使用本项目的pushService

参照TestKafkaConsumerMetricCollector实现

KafkaConsumer<Integer, String> consumer1 = new KafkaConsumer<>(props);
KafkaConsumer<Integer, String> consumer2 = new KafkaConsumer<>(props);
consumer1.subscribe(Collections.singletonList("kafka_test"));
consumer2.subscribe(Collections.singletonList("kafka_test"));

List<Object> clientList = new ArrayList<>();
clientList.add(consumer1);
clientList.add(consumer2);

new KafkaClientMetricPushService(clientList,System.getenv("pushGateWayServer"), "my_kafka_client");

pushService还支持指定push间隔

// 15s 后启动上报线程,上报周期是60s
new KafkaClientMetricPushService(clientList, System.getenv("pushGateWayServer"), "my_kafka_client",15,60);

2.2.2 业务自定义上报

需要业务自己启动一个单独的push线程。Push类似HTTP Export,参考方法如下:

Collector register = new KafkaClientMetricCollector(clientList).register();
PushGateway pg = new PushGateway("127.0.0.1:9091");
pg.push(registry, "my_kafka_client_job");

四、ROADMAP

  • 考虑到兼容0.9.0.1的client,使用了deprecated的api org.apache.kafka.common.Metric.value,后续考虑替换

五、参考资料