kafka

时间:2019-09-19
本文章向大家介绍kafka,主要包括kafka使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

https://www.cnblogs.com/huangqingshi/p/9979909.html

MQ分类

MQ主要分为两类:点对点(p2p)、发布订阅(Pub/Sub)

共同点:消息生产者生产消息发送到queue中,然后消息消费者从queue中读取并且消费消息。

不同点:

p2p模型包括:消息队列(Queue)、发送者(Sender)、接收者(Receiver)一个生产者生产的消息只有一个消费者(Consumer)(即一旦被消费,消息就不在消息队列中)。比如说打电话。

Pub/Sub包含:消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber)

每个消息可以有多个消费者,彼此互不影响。比如我发布一个微博:关注我的人都能够看到。那么在大数据领域呢,为了满足日益增长的数据量,也有一款可以满足百万级别消息的生成和消费,分布式、持久稳定的产品——Kafka。

kafka简介

Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级项目。
Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统。

Kafka的特性

- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
- 高并发:支持数千个客户端同时读写。

Kafka的使用场景

日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等。

消息系统:解耦和生产者和消费者、缓存消息等。

用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘。

运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

流式处理:比如spark streaming和storm。

事件源

Kafka部分名词解释

Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。Broker负责接收数据,并持久化数据,Broker 可以有多个,每个Broker 可以包含多个 Topic,Broker 并不保存 Offset(消费者消费的位置)数据,由 Consumer 自己负责保存,默认保存在 ZooKeeper 中。

Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
Segment:partition物理上由多个segment组成,每个Segment存着message信息
Producer : 生产message发送到topic,生产数据发送到Broker 存储数据。Producer 直连 Broker,不经过任何代理,Producer 将会和 Topic 下所有 Partition Leader 保持 Socket 连接。通常 Producer 是一个包含 Kafka客户端的业务服务。
Consumer : 订阅topic消费message, consumer作为一个线程来消费。业务服务从 Broker 订阅Topic,并从 Topic 中接收数据。每个消费者都属于某个消费者组,一个组里的消费者订阅的是同一个Topic,同一个组的消费者分别订阅同一个 Topic下的不同 Partition 的数据。需要注意的是,每个 Partition 只能被一个消费者订阅,一个消费者可以订阅多个 Partition,用这种方式避免一定的重复消费。当一个消费者挂掉之后,会重新进行负载均衡。

Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,如果一个message可以被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不同的组。Kafka不支持一个partition中的message由两个或两个以上的consumer thread来处理,即便是来自不同的consumer group的也不行。它不能像AMQ那样可以多个BET作为consumer去处理message,这是因为多个BET去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。

kafka主题分区

kafka的每个topic都可以分为多个partition,每个partition都有多个replica(副本),每个分区中的消息是不同的,提高了并发读写的能力,而同一分区的不同副本中保存的是相同的消息,副本之间是一主多从关系,其中leader副本处理读写请求,follower副本只与leader副本进行消息同步,当leader副本出现故障时,则从follower副本中重新选举leader副本对外提供服务。这样,通过提高分区的数量,就可以实现水平扩展,通过提高副本数量,就可以提高容灾能力。

消息的顺序

kafka保证一个partition内消息是有序的,但是并不保证多个partition之间的数据有顺序,每个topic可以划分成多个分区,同一个topic下的不同分区包含的消息是不同的,每个消息在被添加到分区时,都会被分配一个offset,它是此消息在分区中的唯一编号,kafka通过offset保证消息在分区内的顺序,offset顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的。

ACK 机制

通过 ACK 机制保证消息送达。Kafka 采用的是至少一次(At least once),消息不会丢,但是可能会重复传输。

复制机制

Kafka 保证可靠性依赖的是复制机制,因为单机容易出现故障。Kafka 以Topic 为单位进行设置复制因子,以 Partition 为单位进行复制,允许一份数据复制到集群中的多个节点上。通过复制,Kafka 在Broker 集群中的部分节点挂掉的情况下,仍然可以继续发送和接收消息。

消息删除机制

Broker 端删除消息有一个配置策略,默认是7天,如果7天消息还没有消费,则有可能被删除,也就是丢消息了。


ZooKeeper

Kafka将元数据信息保存在Zookeeper中,但是发送给Topic本身的数据是不会发到ZooKeeper上。

Kafka 使用Zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。Broker会在Zookeeper注册并保持相关的元数据(topic,partition信息等)更新。

而客户端会在Zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除Broker时,各Broker间仍能自动实现负载均衡。

这里的客户端指的是Kafka的消息生产端(Producer)和消息消费端(Consumer)。Producer端使用Zookeeper用来"发现"broker列表,以及和Topic下每个partition的Leader建立socket连接并发送消息。也就是说每个Topic的partition是由Leader角色的Broker端使用Zookeeper来注册Broker信息,以及监测partition leader存活性。Consumer端使用Zookeeper用来注册Consumer信息,其中包括Consumer 消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息。

ZooKeeper 在Kafka 集群中承担的作用

Zookeeper管理着Kafka集群中的若干个Broker,保存着一份完整的Broker列表。维护Topic信息,比如Partitions、Replication Factor、ISR(In-sync Replica)等。

Zookeeper帮助选举Partition的Leader。

当有任何变动时,由Zookeeper给Kafka发送通知,比如添加一个新的Topic、Broker挂掉了、删除Topic等等。

Zookeeper集群中也有Leader和Follower的概念。Leader负责写数据,Follower负责读数据.

存储Kafka集群ID。

存储访问控制列表(ACL,Access Control List)。控制Topic、Consumer Group、User等访问权限。


Kafka如何防止丢失数据

生产者

kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到。

生产者发送数据有同步方式和异步方式,先来看一下生产者一些关键配置项。

如果是同步模式:ack机制能够保证数据的不丢失,如果ack设置为0,风险很大,一般不建议设置为0。

在同步方式下发送数据,在发送的时候会产生阻塞,等待ack反馈,acks有三个参数0, 1,all,acks参数设置为0表示不等待反馈,表示不需要等待kafka完成同步确认接收消息,风险很大,数据可能丢失,设置为1表示必须kafka集群中的leader节点接收到消息并确认当前的生产者可以发送下一条消息,但是如果这个时候leader挂掉了,集群中尚未完成其他机器的同步,这时候导致数据丢失,设置为all表示等待kafka集群所有节点反馈,可以保证不丢失数据,三种参数的性能上有差异,比如一些允许丢失的消息,又想提升吞吐量,可以配置成0

producer.type=sync 
request.required.acks=1

如果是异步模式:通过buffer来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,如果buffer满了数据还没有发送出去,如果设置的是立即清理模式,风险很大,一定要设置为阻塞模式 
结论:producer有丢数据的可能,但是可以通过配置保证消息的不丢失。

producer.type=async 
request.required.acks=1 
queue.buffering.max.ms=5000 
queue.buffering.max.messages=10000 
queue.enqueue.timeout.ms = -1 
batch.num.messages=200

  producer.type=async表示异步方式

  request.required.acks=1表示leader接收到消息并反馈 上面的这些参数其他值已经解释了

  queue.buffering.max.ms=5000表示异步模式下缓冲数据的最大时间。例如设置为100则会集合100ms内的消息后发送,这样会提高吞吐量,但是会增加消息发送的延时

  queue.buffering.max.messages=10000表示异步模式下缓冲的最大消息数,同上

  queue.enqueue.timeout.ms=-1表示异步模式下,消息进入队列的等待时间。若是设置为0,则消息不等待,如果进入不了队列,则直接被抛弃

   batch.num.message=200表示异步模式下,每次发送的消息数,当queue.buffering.max.messages或queue.buffering.max.ms满足条件之一时producer会触发发送
 

消费者 

 kafka里面一个主题可以有多个分区,一个分区同时只能被同一个消费者组中的一个消费者消费,消费者组可能包含多个消费者,消费者组下面会记录某个主题某个分区的offset值,来记录消费的到哪里,具体偏移量可以在zk中输入指令,后面的 get /kafka/consumers/(消费者组)/offsets/(主题)/(分区)。

下面看一下消费者的配置

#是否自动提交  如果设置成false需要手动提交
auto.commit.enable = true
# 自动提交的时间间隔
auto.commit.interval.ms = 60 * 1000

如果是设置自动提交,会在 consumer.poll()方法之后自动提交偏移量,这种情况可能导致在取得数据之后,自动提交了偏移量,然后在应用中处理业务的时候事物失败了,导致丢失数据,offset值又加1了,那么下次消费的时候就丢失了数据,如果对于数据一定不能丢失,这里需要配置为false,调用kafka提供的API手工提交。

       虽然可以设置为手工提交,但是好像并不能保证数据库事物与offset一起提交成功,因为 手工提交之后offset变了,但是最后面执行数据库事物操作异常了,这种情况下又感觉消费者数据丢失了,那么这种情况下如何保证消费者不重复消费数据或者,一定保证提交偏移量的数据消费了?

       下面有一种办法,就是把主题,分区,消费者组,offset这些关系存到数据库当中,消费者每次取数据的时候设置一下偏移量,调用消费的seek()方法,这些API可以具体参考kafka的文档,让保存偏移量和应用的事物保持一致就行,具体需不需要这些场景根据业务情况而定。


Kafka如何防止消息重复

 把kafka消费者的配置enable.auto.commit设为false,禁止kafka自动提交offset,从而使用spring-kafka提供的offset提交策略。spring-kafka中的offset提交策略可以保证一批消息数据没有完成消费的情况下,也能提交offset,从而避免了提交失败而导致永远重复消费的问题。


kafka 简单搭建

 https://blog.csdn.net/u012129558/article/details/80065869

参考:

https://blog.csdn.net/qingqing7/article/details/80054281

https://blog.csdn.net/zjh_746140129/article/details/88779640

 https://blog.csdn.net/Future_LL/article/details/88031600

https://blog.51cto.com/xpleaf/2090847

原文地址:https://www.cnblogs.com/dingpeng9055/p/11387995.html