Zabbix监控之从Kafka中获取消费进度和lag

时间:2022-07-23
本文章向大家介绍Zabbix监控之从Kafka中获取消费进度和lag,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

在0.9及之后的版本,kafka自身提供了存放消费进度的功能。本文讲解的是如何从kafka自身获取消费进度。从zookeeper中获取消费进度请阅读我的另一片文章传送门

https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka 这是官网上的教程,提供了scala版本的获取消费状态和提交消费状态的代码。仅供参考。

http://pykafka.readthedocs.io/en/latest/api/broker.html 这是pykafka官网提供获取消费状态的API,试过不知道怎么用,网上也找不到相关代码。

http://kafka-python.readthedocs.io/en/latest/usage.html 这是python-kafka官网,找不到想要的API,没试过。

获取消费进度之前,一定要先弄明白kafka的存储结构以及消费进度是存放在zookeeper中还是kafka中,否则可能会发现到头来,自己都不知道自己在干什么。以上几种方式我都试过,但是都没成功,最后选择命令行的方式获取到消费状态,将消费状态写入文件中,再解析文件。 Kafka管理工具 https://www.iteblog.com/archives/1605.html http://orchome.com/454

使用指令可以获取该组下每个consumer的消费进度

/data/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.12.11.131:9092 --group kafkaTestGroup --describe

然后再将其中的数据取出来,echo到文件中,可以使用crontab来执行指令,定时更新文件。

/data/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.1.8.74:9092 --group datasync.server.10.1.2.118 --describe |grep datasync.server.10.1.2.118 | awk '{print $1,$2,$3,$4,$5,$6,$7}'

将消费状态存放在kafka.log文件中,再解析文件,我这里监控阀值设置为1000,将lag值大于1000的数据取出来并输出。下面是解析文件的python脚本。

#!/usr/bin/env python
#coding=utf-8

import os.path
import time
import pdb
from fileStatus import File


if __name__=="__main__":
    filePath='/data/python-scripts/inspector/AccountInspector/otherInspector/kafka.log'
    f=open(filePath,"r")
    columnNameList=['GROUP','TOPIC','PARTITION','CURRENT-OFFSET','LOG-END-OFFSET','LAG','OWNER']
    result='no topicPartition lag is over allowedRange'
    resultDic={}
    overAllowedLagDic={}
    for line in f:
        #使用命令行处理时有时会得到Consumer group is not exists 或者Consumer group is rebalancing等不正常的结果,这种数据忽略不处理
        if 'Consumer group' not in line: 
            line=line.strip('n')
            lineSplit=line.split(' ')
            dicKey=lineSplit[0]+'_'+lineSplit[1]+'_'+lineSplit[2]
            dicValue={}
            for i in range(0,len(lineSplit),1):
                dicValue[columnNameList[i]]=lineSplit[i]
            #由于我设置的阀值时lag值为1000时就告警,此处LOG-END-OFFSET就是logsize,当logsize小于1000时可以忽略(因为lag总是小于logsize的)
            if dicValue['LOG-END-OFFSET']<='1000':
                dicValue['CURRENT-OFFSET']='0'
                dicValue['LAG']='0'
            resultDic[dicKey]=dicValue
            #使用命令行有缺陷,经常会出现取出来的值为unknown的情况,出现这种情况也当作告警处理
            if dicValue['LAG'] == 'unknown':
                overAllowedLagDic[dicKey]=dicValue
            else:
                if int(dicValue['LAG'])>1000:
                    overAllowedLagDic[dicKey]=dicValue
    if len(overAllowedLagDic)>0:
        result=''
        for key in overAllowedLagDic:
            dicValue=overAllowedLagDic[key]
            lag=dicValue['LAG']
            result=key+':'+lag+'; '+result
    print result

方式很low,而且还有漏洞,后面有时间研究下使用API的方式获取消费进度。