消息队列:Rabbitmq如何保证不丢消息

时间:2022-07-23
本文章向大家介绍消息队列:Rabbitmq如何保证不丢消息,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

背景介绍:

笔者最近研究了下rabbitmq,便很好奇它是怎么保证不丢失消息的呢?于是便整理了这篇文章来跟大家分享下,自己的理解,如有不准确的地方或者不同的意见,还请各位能够给出反馈,我们可以讨论,相互学习,相互成长。

基础知识:

在开始探讨这个问题之前,笔者还是觉得很有必要将rabbitmq的架构等基础知识回顾下,如下所示:

对于使用rabbitmq的服务来说,主要由三部分构成,它们分别是:生产者,rabbitmq,消费者。这三者之间是通过网络来进行通讯的,其中与生产者对应的是exchange,与消费者对应的是connection,而rabbitmq内部又由exchange,queue,connection三部分构成。

消息的流程:消息是由生产者生产了之后,上报给exchange,exchange绑定并存储到queue中,再传递给最终的消费者手里。

如此以来,整个过程就分成了三大场景:

场景1: 生产者与exchange的上报消息,如何保证不丢失?

对于网络通讯来说,解决丢数据最好的办法就是,消息确认机制,而rabbitmq里面是通过两个方式来保证:一种是事务机制,这个是在amqp协议层面保证的,具体操作如下所示:

RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。(备注:采用事务机制实现会降低RabbitMQ的消息吞吐量。)

步骤为:

1.client----发送----->Tx.Select
2.broker----发送----->Tx.Select-Ok(之后publish)
3.client------发送----->Tx.Commit
4.broker------发送---->Tx.Commit-Ok

一种是confrim机制:

原理:消息响应机制,

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号。

confrim的优势是,它是异步的,在生产者发送完一个消息之后,不必要等这个消息的返回,就可以继续处理另外一个消息,等待消息的ack返回之后,再去处理前面发过的消息,类似于多路复用的做法。rabbitmq在收到之后,会回复ack,如果因为rabbitmq自身的问题导致的,会回复nack消息。

对于生产者来说,为了方便确认消息有没有真正到达rabbitmq端,还需要在生产者端设置超时重发,毕竟网络里面是可能丢失消息的。

confrim方式使用的API:

https://godoc.org/github.com/streadway/amqp#Channel.Confirm

场景2: 消费者从queue中获取消息如何保证不丢失?

消息在到达了rabbitmq之后,会将数据保存到queue里面,queue是存到内存里面的,不过rabbitmq提供了持久化的操作,这个策略如下所示:

1.buffer大约1M左右,写满之后,就会写到磁盘中。
2.25ms超时时间,buffer不满的话,超时也会写到磁盘中。

尽管如此,也有可能会丢数据,特别是当rabbitmq在buffer没有写到磁盘的时候,就死掉了。不过rabbitmq也提供了镜像队列的方式,利用主备的方式来防止消息丢掉,不过当master和salve同时挂掉的话,还是会丢数据,只不过这种同时挂掉的概率会小很多。(笔者觉得,没有百分之百的不丢消息,只是丢消息的概率变的很低而已。)

参考文章:https://blog.csdn.net/u013256816/article/details/60875666

场景3: rabbitmq内部如何保证不丢失消息?

对于消费者来说,同样也是采用了消息响应的方式来防止消息不丢失,不过在这一层使用的是ack机制来处理,不过这里的ack可以设置成不等待ack和等待ack两种,在这里我们使用的是设置ack。

消费者对于消息的响应通常有下面三个场景:

1.消费者在接收到rabbitmq的消息之后,等处理完消息之后,会主动回复ack消息,rabbitmq在收到ack之后,便会继续给这个消费者分配下一个消息进行处理。

2.当然rabbitmq也可以回复unack消息,如此以来消息队列下一次还会继续将这个消息分配给消费者,来实现消息重处理。

3.消费者先把ack消息回复掉,然后在重新将这个消息放到rabbitmq之中,如此以来通过rabbitmq的队列特性来实现,消息的重试,这里的重试,不是一直处理这一个消息,而是要等到队列里面的消息排队到它才行。

除此之外,rabbitmq还实现了批量的概念,通过qos来设置一次性分配给一个消费者的最大数量,让消费者一次性消费一批消息,等到处理完了这一批消息,再去分配下一批消息给这一个消费者。

问题1:一旦消费者长时间不回复Ack消息或者消费者卡死了呢,这种场景如何处理?

理论上需要消费者需要实现一个超时处理机制,在一定时间内没有处理完毕,需要超时回复ack或者unack消息给rabbitmq。

问题2:就算消费者有超时机制,可是一旦消费者在发送ack给rabbitmq的时候,消息丢失,rabbitmq这个消息一直收不到响应消息的话,会怎么办呢?

rabbitmq提供了一个可以针对消费者挂掉之后的处理机制,在消费者挂掉之后,会探测出来,然后进行后续处理。

rabbitmq还有一个ttl的功能,可以针对消息队列或者单个消息设置对应的ttl值,一旦ttl超时,消息就会变为dead message, 不会再分配给消费者。在这里我们可以采用这个策略,在消息变成死消息之后,我们可以让生产者再次生产相同的消息放到rabbitmq当中,如果确定这个消息不在使用了,就直接丢弃这个消息。

可以参看:https://blog.csdn.net/u013256816/article/details/54916011

消费者ack相关api:

https://godoc.org/github.com/streadway/amqp#Channel.Ack


备注:如有不妥之处,欢迎大家指正,讨论。

因为公众号没有评论功能,所以选了一个第三方小程序做评论入口,大家可以试用下,不好用的话,我直接移除,在研究其他评论方式。

留言评论