kafka_consumer

时间:2019-11-14
本文章向大家介绍kafka_consumer,主要包括kafka_consumer使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

使用pykafka进行消费

 1 #coding:utf8
 2 from pykafka import KafkaClient
 3 import time
 4 from pykafka.common import OffsetType
 5 import json
 6 
 7 client = KafkaClient(hosts='192.168.1.1:9092')
 8 
 9 # print client.topics
10 
11 topic = client.topics['perfin']
12 
13 consumer = topic.get_balanced_consumer(consumer_group='perftest',
14                                        auto_commit_enable=True,
15                                        zookeeper_connect='192.168.1.1:2181',
16                                        auto_commit_interval_ms = 1000,
17                                        auto_offset_reset = OffsetType.LATEST,
18                                        # auto_commit_enable=True, auto_co
19                                        )
20 
21 start = time.time()
22 c=0
23 tmp = []
24 for msg in consumer:
25     c+=1
28     tmp.append(msg.value)
29     if c%1000==0:
30         now =time.time()
31         print "[%s] avg speed: %s /second"%(time.strftime('%Y-%m-%d %H:%M:%S'),int(c/(now-start)))
32         start =now
33     if c > 10000:
34         open('./kafkamsg.txt','w').write("\n".join(tmp))
35         import sys
36         sys.exit(0)

原文地址:https://www.cnblogs.com/yeyong/p/11855035.html