消息队列的使用(kafka举例)

时间:2022-07-23
本文章向大家介绍消息队列的使用(kafka举例),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

我对消息队列的理解?

  • 先举个列子,排队买票。在我们平时买火车票的时候是不是来一个人就要去排队等待,然后售票员根据排队的顺序去给他们卖票。我们可以将这个队伍看作一个容器,那这个容器就是消息队列了。
  • 在Java的线程池中我们就会使用一个队列(BlockQueen等)来存储提交的任务;
  • 在操作系统中中断的下半部分也会使用工作队列来实现延后执行
  • 还有RPC框架,也会从网络上姐收到请求写到消息队列里,在启动若干个工作线程来进行消费。 总之不管是在我们的生活中还是在系统设计中使用消息队列的设计模式和消息队列组件实在是太多了。

为什么有这么多地方都用消息队列呢?

  1. 削峰填谷,当业务量特别大的时候我们可以使用消息队列来进行缓冲,当队列达到一定量的时候可以增加队列处理机来加速处理。当业务量比较少的时候慢慢去消费挤压下来的请求。(在业务需求允许的演出时间内)
  2. 扩展性:当使用的消息队列处在消息对立的数据可以被任何地方消费。可以做任何的数据处理操作等。
  3. 异步通信:很多时候我们不想同步处理一些数据,或者是业务流程比较长,将中间的一些环节异步处理,那么我们就可以使用消息队列。
  4. 松耦合: 进入消息队列的数据不仅可以被业务系统消费,当有BI团队需要分析这些数据的时候我们也可以发送一份给他们

使用消息队列会遇到的问题

1. 消息丢失

  • 生产者向队列中跑消息的时候
    1. 一般出现这种情况是因为一般队列部署和业务服务之前是不同的服务器,在传输的过程中会出现网络抖动,从而导致消息丢失,那么这种情况我们解决的办法是什么呢? 那就是消息重发,那重发的过程中会出现消息队列重复。
  • 消息在队列中存储的时候
    1. 当消息被抛到消息队列的服务中的时候,这个时候消息队列还是会丢失,我们用比较成熟的消息队列中间件kafka来举列子, kafka的队列存储是异步进行的,刚开始队列是存储在操作系统的缓存中(page cache)然后找得到合适的时机进行同步刷新到磁攀(这种方式可以减少I/O次数提高效率)
    2. 我们可以想一个场景,那就是当cache中的数据还没有刷新到磁盘,突然停电宕机,那岂不是cache是存储在内存中的,拿数据不就都丢失了。我们再想,那我们把这个刷盘的频率提高,每接受一次消息就刷盘一次,那就本末倒置,那性能就不行了。
    3. kafka这么牛逼的中间件肯定有他们的解决办法那就是集群部署,通过部署多个副本进行备份数据保证消息尽量不丢失。具体实现:kafka集群有多台服务,其中有一台是leader,负责消息的写入和消息的消费,还有其他的就是folower负责数据的备份,Followwer中有一个特殊的集合叫做ISR(in-sync replicas), 当leader故障的时候,新的leader就在ISP 这个结合中获取,leader的数据会同步给被选中的follwer,这样在leader挂了的时候,kafka会消费Follower中的消息 减小消息丢失的可能。
    4. 但是还有一种比较极端的情况就是消息还没有同步的时候leader挂掉了,在kafka中为生产者提供了ack ,当这个选项被设置为all 的时候,生产者给kafkaleader的同时发送消息也会给ISR集合中的Follower发送消息,并且必须保证都发送成功才会被认为发送成功。这样只有ISR和leader都挂掉才会有丢失消息
  • 消息被消费者消费的过程
    1. 我们在这一步骤我们依然以kafka为列子,消息消费有三个步骤, 接收消息,处理消息,更新消费进度。
    2. 在进行kafka给消费者发送消息的时候,发生网络抖动,导致消息没有被正确的接受到,处理消息时可能发生一些业务的异常导致处理流程为执行完成,这是且更新了完成进度那么就会永远接收不到这条消息了。所以在业务逻辑中一定要的确认业务逻辑跑完了才去更新消息消费进度。
    3. 当kafka发送完消息后宕机,然后业务服务器处理完成且去更新消息消费进度,这个时候就更新不了了,当kafka重新启动,又会重新跑消息。如果这个时候业务系统不具备幂等性那么就会出现业务bug。

2. 保证消息只被消费一次

  1. 从上面的分析来看,我们为防止消息丢失而不得不重发消息,进而导致消息重复接受,重复消费的问题。那我们该如何解决这个问题呢? 上面有提到过“幂等”。
  2. 什么是幂等?
  • 幂等的意思就是:在进行多次对某个数据,或者某个事件的操作,这个事务或者数据不会被多次改变。 列如: 一条update 语句进行更新,一直更新都会是第一次执行的结果。
  1. 在生产消费的过程中保证消息的幂等
  • 在消息生产的时候 kafka 支持“prducer idempotency ”的特性,翻译过来就是生产过程的幂等性,为生产者定义一个唯一ID,producer产生的每一条消息都赋值一个唯一ID,当生产者发送消息过来的时候先进ID的比较,如果过来的ID和消息队列中队尾的消息ID一样就丢弃(感觉有点乐观锁的意思),所以就会保证队列中不会重复消息。
  1. 还有就是在消费端进行幂等设计
  • 可以在通用层进行幂等设计,一般在使用中间件的时候,会对其封装一层。为方便业务逻辑层的使用。在上面我们知道在产生消息的时候会生成一个唯一的ID,在接收消息消费的时候将其存储在DB里面,没一次过来的时候我们都去check一下,如果已经消费过了那我们就丢弃。这样也要考虑宕机风险,那就是消费完了,然后宕机了,ID没有存进去数据库。遇到问题解决问题,那就是将处理ID 和消费消息放在同一个事务里面。要么全成功,要么全失败。prefect。
  • 还有就是在业务逻辑层幂等处理。我们可以给处理的数据加版本号,在生产的时候先把这个数据的版本号拿到,每处理一次就进行版本号的更新且对比这个版本号。如果这个消息再来的时候版本号已经对应不上那就更新不了了(正八经的乐观锁) (可以想一下elatiscSearh中的并发控制模式是不是很像)
update user set amount = amount + 20, version=version+1 where userId=1 and version=1;