死信队列的消息处理方案

时间:2022-07-24
本文章向大家介绍死信队列的消息处理方案,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

昨天在处理死信队列消息时,发生了很多疑问,但是实际方案还未实现,一一记录解答。

1.死信队列出现的原因

跟预想的什么事务啊,重试啊,宕机啊没dei关系

Cannot display ObjectMessage body. Reason: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: xxx

应该是处理此条消息的时候,实体类未序列化?然后我重试下,将实体类序列化去掉,这在运行时会直接异常的,目前原因不详。

2.如何处理死信队列中的消息?

这个监听的思路是对的,就是实施有点问题,总是监听不到

1:人工处理(太累)

2:定时任务(太耗性能)

3:监听死信队列

4:死信队列写库

另外处理消息时,会发生与预想结果不一致,业务是点赞/取消点赞,如果原本目的是取消点赞,但操作失败redis是有的,进入死信队列数据库是没数据的,我在此期间对这条数据进行了点赞,然后又取消了,那如果此时我处理这条消息,会进行点赞,与原本的目的不一致

3.监听+时间

创建一个监听器,监听死信队列ActiveMQ.DLQ队列是否有消息,有消息就进行消费。每次mq入队前标识一个时间戳,取出死信队列的消息,与当前库里的操作时间对比,如果最后一条记录的时间大于此条消息时间不予处理,否则进行消息补偿。redis+mq+mysql进行数据同步时同理

4.redis+mq并发1万会产生消息积压吗?

不会,产生积压的原因是业务系统不再监控某队列,即便是1万并发同事请求,肯定会发生队列排队消费,但不会发生积压,另外如出现此情况,需要短信报警,并手动删除或脚本删除此队列。

最高等待队列数

5.一个业务一个队列,无用队列怎么处理?

目前接触的业务,每个业务都需要自定义队列名,有的队列等待,有的始终没处理业务,此时可自定义关闭监测时间内不工作的队列,如需要时再开启,以此减少其他队列的压力。

配置可看下activemq.xml的47行

constantPendingMessageLimitStrategy用于防止
慢话题消费者阻碍生产者和影响其他消费者
通过限制保留的消息数
<destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
<!-- 在这里加上schedulePeriodForDestinationPurge属性。-->
<broker xmlns="http://activemq.apache.org/schema/core" schedulePeriodForDestinationPurge="10000"
    <destinationPolicy>
        <policyMap>
            <policyEntries>
                <!-- 在这里加上gcInactiveDestinations和inactiveTimoutBeforeGC两个属性 -->
                <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>
            </policyEntries>
        </policyMap>
    </destinationPolicy>
</broker>

6.为什么预想3万次的任务执行,结果不一致?

为了测试业务是否会出现频繁取消确认出现不一致的情况,单接口一万次,测了3次,目前一共执行了3次,第一次告8552,第二次,第三次是成功的,按理说一共是28552次,但结果是28527,理想是3万次,在jmeter的结果树种分析无错误日志

原因不晓得。勾选Scroll无用。

这个队列加时间跟

如何解决redis的并发竞争key问题相似,处理方案也是相似

方案仅供参考。