Zabbix监控之从Kafka中获取消费进度和lag
在0.9及之后的版本,kafka自身提供了存放消费进度的功能。本文讲解的是如何从kafka自身获取消费进度。从zookeeper中获取消费进度请阅读我的另一片文章传送门
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka 这是官网上的教程,提供了scala版本的获取消费状态和提交消费状态的代码。仅供参考。
http://pykafka.readthedocs.io/en/latest/api/broker.html 这是pykafka官网提供获取消费状态的API,试过不知道怎么用,网上也找不到相关代码。
http://kafka-python.readthedocs.io/en/latest/usage.html 这是python-kafka官网,找不到想要的API,没试过。
获取消费进度之前,一定要先弄明白kafka的存储结构以及消费进度是存放在zookeeper中还是kafka中,否则可能会发现到头来,自己都不知道自己在干什么。以上几种方式我都试过,但是都没成功,最后选择命令行的方式获取到消费状态,将消费状态写入文件中,再解析文件。 Kafka管理工具 https://www.iteblog.com/archives/1605.html http://orchome.com/454
使用指令可以获取该组下每个consumer的消费进度
/data/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.12.11.131:9092 --group kafkaTestGroup --describe
然后再将其中的数据取出来,echo到文件中,可以使用crontab来执行指令,定时更新文件。
/data/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.1.8.74:9092 --group datasync.server.10.1.2.118 --describe |grep datasync.server.10.1.2.118 | awk '{print $1,$2,$3,$4,$5,$6,$7}'
将消费状态存放在kafka.log文件中,再解析文件,我这里监控阀值设置为1000,将lag值大于1000的数据取出来并输出。下面是解析文件的python脚本。
#!/usr/bin/env python
#coding=utf-8
import os.path
import time
import pdb
from fileStatus import File
if __name__=="__main__":
filePath='/data/python-scripts/inspector/AccountInspector/otherInspector/kafka.log'
f=open(filePath,"r")
columnNameList=['GROUP','TOPIC','PARTITION','CURRENT-OFFSET','LOG-END-OFFSET','LAG','OWNER']
result='no topicPartition lag is over allowedRange'
resultDic={}
overAllowedLagDic={}
for line in f:
#使用命令行处理时有时会得到Consumer group is not exists 或者Consumer group is rebalancing等不正常的结果,这种数据忽略不处理
if 'Consumer group' not in line:
line=line.strip('n')
lineSplit=line.split(' ')
dicKey=lineSplit[0]+'_'+lineSplit[1]+'_'+lineSplit[2]
dicValue={}
for i in range(0,len(lineSplit),1):
dicValue[columnNameList[i]]=lineSplit[i]
#由于我设置的阀值时lag值为1000时就告警,此处LOG-END-OFFSET就是logsize,当logsize小于1000时可以忽略(因为lag总是小于logsize的)
if dicValue['LOG-END-OFFSET']<='1000':
dicValue['CURRENT-OFFSET']='0'
dicValue['LAG']='0'
resultDic[dicKey]=dicValue
#使用命令行有缺陷,经常会出现取出来的值为unknown的情况,出现这种情况也当作告警处理
if dicValue['LAG'] == 'unknown':
overAllowedLagDic[dicKey]=dicValue
else:
if int(dicValue['LAG'])>1000:
overAllowedLagDic[dicKey]=dicValue
if len(overAllowedLagDic)>0:
result=''
for key in overAllowedLagDic:
dicValue=overAllowedLagDic[key]
lag=dicValue['LAG']
result=key+':'+lag+'; '+result
print result
方式很low,而且还有漏洞,后面有时间研究下使用API的方式获取消费进度。
- windows 2003 32位系统能支持的最大内存数
- .Net Core内存回收模式及性能测试对比
- silverlight中顺序/倒序异步加载多张图片
- MySQL数据库性能优化之三
- 谁说 Java 要过时?2017年Java 大事件一览及未来前瞻
- mongodb的用法
- silverlight中如何将string(字符串)写入Resource(资源)?
- Python练习环境搭建-引入预定义数据
- 振幅和成交量的关系
- silverlight中的几个冷门标记 {x:Null},d:DesignWidth,d:DesignHeight
- 用scikit-learn和pandas学习线性回归
- 自动驾驶玩出新花招,以后老司机们就要失业了
- silverlight.net官方网站图片切换源码
- 制作iis自动安装包
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Axios安装封装api接口
- 排障集锦:九九八十一难之第七难!mysql数据库登录密码忘记了
- Css实现上下无限跳动
- 排障集锦:九九八十一难之第八难!ERROR 2002 (HY000): Can‘t connect to local MySQL server
- Vue Router配置参数、404页面
- 机器学习之决策树三-CART原理与代码实现
- Vue Router路径切换过渡动画
- 听说Mysql你很豪横?-------------深入解析mysql数据库中的索引!
- 听说Mysql你很豪横?-------------深入解析mysql数据库中的事务!
- Struts2笔记
- Vue Router实现路由嵌套单页面展示
- 排障集锦:九九八十一难之第九难!mysql备份恢复路上的小插曲
- jQuery限制复选框checkbox的选中次数
- jQuery点击切换增加和删除class类
- Vue使用props和emit父子组件通信