消息队列——ActiveMQ使用及原理浅析

时间:2022-07-24
本文章向大家介绍消息队列——ActiveMQ使用及原理浅析,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

文章目录

  • 引言
  • 正文
    • 一、ActiveMQ是如何产生的?
      • 产生背景
      • JMS规范
        • 基本概念
        • JMS体系结构
    • 二、如何使用?
      • 基本功能
        • 消息传递
          • P2P
          • pub/sub
          • 持久订阅
      • 消息传递的可靠性
        • 事务型会话与非事务型会话
        • 持久化与非持久化消息的存储策略
      • 消息发送策略
    • 三、原理浅析
      • 发送原理
      • 消费原理
        • 消费消息流程
        • 消息确认及消息重发
    • 四、基本优化
  • 总结

引言

什么是消息中间件?随着业务的急速增长以及分布式架构的逐渐兴起,首先要考虑的就是如何高效的在各节点之间传递消息,其次要考虑的是流量洪峰时如何削减系统的压力以及跨平台消息的传输等问题,消息中间件就可以帮我们解决以上问题。而消息队列产品众多,我们该如何选择呢?本系列文章主要针对目前使用最多的ActiveMQ、Kafka、RabbitMQ进行讲解说明。

正文

一、ActiveMQ是如何产生的?

产生背景

一开始消息中间件的厂商繁多,且各个厂商之间没有统一的规范,这就导致了各消息中间件非常难以整合协作,因此,后来陆续出现了如JMS和AMQP这样的消息队列规范,提供了统一的标准,而ActiveMQ就是完全遵循JMS规范开发的消息队列。

JMS规范

基本概念

什么是JMS(Java Message Service)规范?JMS是一个基于Java平台面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。在设计JMS时,设计师就计划能够结合现有消息队列的优点,如:

  • 不同的消息传送模式或域,例如点对点消息传送和发布/订阅消息传送
  • 支持同步和异步消息
  • 支持可靠性消息的传输
  • 支持常见的消息格式,如:文本、字节、流、对象等

JMS体系结构

上面是从百度找的一个图片,下面对其中各个对象分别进行说明:

  • ConnectionFactory:连接工厂,一般设为单例模式,一旦创建,就一直运行在应用容器内,客户端使用连接工厂创建一个JMS连接。
  • Connection:JMS连接表示JMS客户端和服务器端之间的一个活动的连接。
  • Session:JMS会话表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。
  • Destination:消息管道,从生产端流向客户端,包括队列(PTP),主题(Pub/Sub)。
  • Message Producer和Message Consumer:生产者和消费者对象由Session对象创建,用于发送和接收消息。
  • Message:JMS 消息由以下几部分组成:消息头,属性,消息体。
    • 消息头(header):JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由Routing。
    • 属性(property):由消息发送者产生,用来添加删除消息头以外的附加信息。
    • 消息体(body):由消息发送者产生,JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。

了解了基本概念后,下面就一起来看看如何使用ActiveMQ吧。

二、如何使用?

基本功能

本节主要讲解activeMQ的基本功能和使用,详细API请查阅官方文档。

消息传递

在上文也讲了ActiveMq支持P2P(点对点)传输和pub/sub模型,这两种传递方式的本质区别就是消息是否可重复消费。比如微信私聊和群聊,私聊就是P2P,除了私聊的双方其它人无法再获取消息,而群聊就相当于pub/sub模式,即群成员都订阅了该群的消息。下面首先我们来看看P2P传输。

P2P

先创建一个Producer生产消息:

    public static void main(String[] args) {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        Connection connection = null;
        try {
            // 创建并开启连接
            connection = factory.createConnection();
            connection.start();

            // 创建会话,设置是否为事务型会话以及消息签收方式
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            // 创建发送队列
            Destination destination = session.createQueue("queue");
            // 创建消息发送者
            MessageProducer producer = session.createProducer(destination);
            // 创建消息并设置消息内容
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText("Hello");
            // 发送消息
            producer.send(textMessage);

            session.commit();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

上面代码注释写的很清楚了,可以看到是完全符合JMS的体系结构的,首先创建一个连接工厂,并通过连接工厂创建连接,然后通过连接创建会话(在创建会话时可以指定是否为事务型会话以及设置消息的签收方式,相关概念在后面会详细讲解),之后再为本次会话创建管道,即传输队列(这里可以指定是创建队列(p2p)还是还是主题(pub/sub)),最后创建消息对象发送到管道提交即完成本次会话的消息生产。接下来看看消费者如何消费消息:

    public static void main(String[] args) {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        Connection connection = null;
        try {
            connection = factory.createConnection();
            connection.start();

            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            // 创建接收队列
            Destination destination = session.createQueue("queue");
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            // 接收消息
            TextMessage message = (TextMessage) consumer.receive();
            System.out.println(message.getText());

            session.commit();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

整个流程和生产者流程基本是一样的,只不过消费者不再需要自己生产消息,而是从消息队列中获取,这里是通过receive方法获取的,该方法相当于是客户端主动从队列中“拉”消息,并且在消息队列为空时会阻塞等待消息传入;另外还有一种队列“推”送的方式,通过监听器实现。

    public static void main(String[] args) {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        Connection connection = null;
        try {
            connection = factory.createConnection();
            connection.start();

            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("queue");

            MessageConsumer consumer = session.createConsumer(destination);
            // 使用监听器监听队列
            MessageListener listener = new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println(((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            };
            while (true) {
                consumer.setMessageListener(listener);
                session.commit();
            }

        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

需要注意的是listener不会阻塞等待,当消息到达时会主动调用onMessage方法,但它的生命周期和方法的生命周期是相同的,需要像上面一样死循环监听,同时receive和listener是互斥的,即同时只能使用其中一种方式来获取消息。

pub/sub

相对于P2P,发布订阅模式就是可以有多个消费者监听同一个队列,并可重复消费同一个消息,整个代码实现流程和上面的是一样的,只是将 Destination destination = session.createQueue(“queue”);改为Destination destination = session.createTopic(“topic”);即可。 这里需要思考一个问题,消费者能够订阅到哪个时间段的消息呢?是所有的消息还是自消费者注册监听之后的呢?很显然,肯定是只能获取到注册监听之后的消息。但是,若是消费者中途怠机再恢复,怠机过程中产生的消息能否接收到呢?AcitveMQ是支持获取怠机过程中的消息的,即持久订阅工功能。

持久订阅

什么是持久订阅?举个例子,相当于你在微博点击关注某个博主,无论你是否在线,博主发送的消息你都是可以获取到的,持久订阅就类似这样,在创建好连接后首先设置一个自身的身份标识clientId,这个id是唯一的:

connection.setClientID("lwj");

然后通过下面API创建消费者即可创建持久订阅:

MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "lwj");

需要注意持久订阅只有pub/sub模式下才支持。

消息传递的可靠性

在学习了基础的使用后,我们应该考虑一个问题,消息队列该如何保证消息传递的可靠性呢?即如何保证生产的消息正确被消费者签收或者被生产者销毁?这就牵涉到事务型会话和非事务型会话,JMS Session接口提供了 commit 和 rollback 方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。 事务型的会话总是牵涉到事务处理中,commit 或 rollback 方法一旦被调用,一个事务就结束了,而另一个事务被开始;关闭事务性会话将回滚其中的事务。

事务型会话与非事务型会话

JMS在创建session会话时通过第一个参数指定是否为事务型会话:

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

当为事务型会话时,调用commit方法前消息并不会真正的投递到消息中间件中去,而在调用commit后消息会自动确认,需要保证发送端和接收端都是事务型会话。 当为非事务型会话时,相当于生产者逐个投递到消息中间件,但是消息的确认取决于消费者如何设置ACK_MODE,即创建会话时的第二个参数,该参数有4个选项:

  • SESSION_TRANSACTED:当为事务型会话时的默认选项,若不是事务型会话设置该参数会抛出异常
  • AUTO_ACKNOWLEDGE:当消费者成功的从 receive 方法返回的时候,或者从MessageListenner.onMessage 方法成功返回的时候,会话自动确认客户收到消息。
  • CLIENT_ACKNOWLEDGE:消费者通过调用Message的 acknowledge 方法确认消息。需要注意该模式下何时调用acknowledge方法,那么在调用该方法之前收到的消息都会一起被确认,而在此之后收到的消息不会被确认。比如,发送10条消息,消费者在收到第5条消息时调用acknowledge方法,那么前5条都会被确认。
  • DUPS_OK_ACKNOWLEDGE:消息延迟批量确认,消息生产者在消费者没有确认消息时会重新发送消息。该模式可优化消费者确认消息的性能,但可能会导致消费者收到重复消息(这个参数在优化一节中还会详细讲解)。

需要注意第一个是和事务绑定,后面三个都是针对消费端的,即消息中间件需要接收到消费者的ack才会认为消息被正确处理。

持久化与非持久化消息的存储策略

消息队列为保证高效,消息首先肯定是存储在内存中的,那么一旦消息队列怠机或者消息过多超出内存,消息就会面临丢失的风险,所以需要有相关的手段来保证。 正常情况下,非持久化消息是存储在内存中的,能够存储的最大消息数据在/conf/activemq.xml文件中的systemUsage节点可配置:

<systemUsage>
	<systemUsage>
		<memoryUsage>
			<memoryUsage percentOfJvmHeap="70" />
		</memoryUsage>
		<storeUsage>
			<storeUsage limit="100 gb"/>
		</storeUsage>
		<tempUsage>
			<tempUsage limit="50 gb"/>
		</tempUsage>
	</systemUsage>
</systemUsage>
  • memoryUsage是设置整个ActiveMQ节点的“可用内存限制”。这个值不能超过ActiveMQ本身设置的最大内存大小。其中的percentOfJvmHeap属性表示百分比。
  • storeUsage是设置整个ActiveMQ节点,用于存储“持久化消息”的“可用磁盘空间”。
  • tempUsage是设置临时文件大小。一旦ActiveMQ服务节点存储的消息达到了memoryUsage的限制,非持久化消息就会被转储到 temp store区域,虽然我们说过非持久化消息不进行持久化存储,但是ActiveMQ为了防止数据洪峰出现时非持久化消息大量堆积致使内存耗尽的情况出现,还是会将非持久化消息写入到磁盘的临时区域——temp store。

从上文我们可以了解到ActiveMQ的存储策略,但是还有个问题,持久化消息是通过什么介质存储的呢?主要有以下5种:

  • KahaDB:默认的存储方式。在data/kahadb这个目录下,会生成四个文件:
    • db-*.log 存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较快的。默认是32M,达到阀值会自动递增。
    • db.data 它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-*.log里面存储的消息。
    • db.redo 用来进行消息恢复
    • lock文件 锁,表示当前获得kahadb读写权限的broker
  • JDBC存储,需要配置JDBC连接以及引入相应的jar。会在数据库创建三张表:
    • ACTIVEMQ_MSGS:消息表,queue和topic都存在这个表中
    • ACTIVEMQ_ACKS:存储持久订阅的信息和最后一个持久订阅接收的消息ID
    • ACTIVEMQ_LOCKS:锁表,用来确保某一时刻,只能有一个ActiveMQ broker实例来访问数据库
  • Memory存储:即内存
  • LevelDB存储:性能优于KahaDB,但官方不推荐使用。
  • JDBC Message store with ActiveMQ Journal:这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库和读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。

详细配置方式参照官方文档。

消息发送策略

ActiveMQ支持同步、异步两种发送模式将消息发送到消息中间件上。 同步发送过程中,发送者发送一条消息会阻塞直到消息中间件反馈一个确认消息,表示消息已经被消息中间件处理。这个机制提供了消息的安全性保障,但是由于是阻塞的操作,会影响到客户端消息发送的性能。异步发送的过程中,发送者不需要等待broker提供反馈,所以性能相对较高。但是可能会出现消息丢失的情况。所以使用异步发送的前提是在某些情况下允许出现数据丢失的情况。 默认情况下,非持久化消息是异步发送的,持久化消息并且是在非事务模式下是同步发送的。但是在开启事务的情况下,消息都是异步发送。由于异步发送的效率会比同步发送性能更高,所以在发送持久化消息的时候,尽量去开启事务会话。除了持久化消息和非持久化消息的同步和异步特性以外,我们还可以通过以下几种方式来设置异步发送:

ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616?jms.useAsyncSend=true");
((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
((ActiveMQConnection)connection).setUseAsyncSend(true);

三、原理浅析

ActiveMQ的上手非常简单,但仅仅只是会用肯定不行,只有了解其原理,才能对特定的场景做出优化和设计,而要了解其原理,只有通过分析其源码才能完全了解。限于篇幅原因,接下来只是针对发送消息、消费消息和消息重发机制的流程做一个概括性总结。

发送原理

上面就是整个发消息的流程图,当生产者调用send发送消息时,首先会判断producerWindowSize(这个稍后会详细讲解)是否还有空间,若没有了就阻塞等待空间;反之则继续判断是否是异步发送消息,如果是同步,则直接通过底层传输协议传输消息,并阻塞等待response结果;如果是异步发送,同样通过底层传输协议传输消息,但不再需要阻塞等待response,同时会去增加producerWindowSize的值。 什么是producerWindowSize?这个配置主要用来约束异步发送时producer端允许积压(未ack)的消息大小。当发送消息时,首先会判断producerWindowSize是否还有剩余空间,如果没有就阻塞等待空间释放,即等待broker(可以就当作是消息队列中间件)确认消息;如果有空间,就放入到该空间下,等待broker处理。可以通过以下两种方式配置:

  • 在连接url中设置,对所有producer都有效:tcp://localhost:61616?jms.producerWindowSize=1048576
  • 在destination名称中设置,仅对使用该destination的producer有效,并且优先级更高:test-queue?producer.windowSize=1048576

消费原理

消费消息流程

消费者在通过receive消费消息时,并不是直接去broker上获取的消息,而是从本地的unconsumerMessage队列中获取,而该队列则是每次批量从broker上拉取消息,每次拉取的数量就是由prefetchSize控制的。当队列中没有消息时,就会阻塞等待获取消息;反之则依次从unconsumerMessage队列中取出消息消费,并将应答放到delivered队列返回给broker,消费消息和ack是异步的。那消息是如何添加到unconsumerMessage队列中的呢?这个过程也是非常复杂的,这里就不详细分析了,感兴趣的读者可自行分析源码。下面我们来看看消息的确认过程。

消息确认及消息重发

看到上面这张图,可能会比较懵,没关系,我们首先来了解一下ACK_MODEACK_TYPE,ACK_MODE在上文已经讲过了,但仅仅是消费端确认了还不够,还需要让broker知道消息是否正常消费,因此在确认消息后消费者还会根据处理结果返回不同的ACK_TYPE给broker,ACK_TYPE一共有以下6种:

  • DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未处理结束。
  • POSION_ACK_TYPE = 1 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者 加入DLQ(死信队列)
  • STANDARD_ACK_TYPE = 2 “标准"类型,通常表示为消息"处理成功”,broker 端可以删除消息了。
  • REDELIVERED_ACK_TYPE = 3 消息需"重发",比如 consumer 处理消息时抛出了异常,broker 稍后会重新发送此消息。
  • INDIVIDUAL_ACK_TYPE = 4 表示无论在任何 ACK_MODE 下只确认"单条消息"。
  • UNMATCHED_ACK_TYPE = 5 在 Topic 中,如果一条消息在转发给“订阅者”时,发现此消息不符合 Selector 过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(相当于在 Broker 上确认了消息)。
  • EXPIRED_ACK_TYPE = 6 消息已过期。

清楚了ACK_TYPE所对应的意思后,再看这张图就很明了了。首先从unconsumerMessage队列中取出消息并处理,若消费消息出现异常失败,消费者就会返回REDELIVERED_ACK_TYPE给broker,broker就会重发该条消息,当超过次数限制消费者就会返回POSION_ACK_TYPE告诉broker该条消息是有毒的,broker根据配置将该条消息抛弃或是加入死信队列中(该队列可以被重新消费);若消费消息成功未出现异常,就会将ack message添加到delivered队列中,消费该队列的消息时,会进行一系列判断并根据结果返回不同的ACK_TYPE。 刚刚我们提到消息消费失败会导致消息重发,那究竟在哪些情况下会被重发呢?主要有以下几种情况:

  • 在事务型会话中,若是没有调用session.commit提交确认消息或者调用session.rollback方法。
  • 在非事务性会话中,ACK 模式为 CLIENT_ACKNOWLEDGE 的情况下,没有调用 acknowledge 或者调用了 recover 方法。
  • 处理消息时发生异常。

这就是整个消息的确认和重发原理。

四、基本优化

使用任何一个中间件并出现性能问题时,我们都会考虑如何去优化,本节只是简单讲讲消费端的优化。 在上文我们提到过prefetchSize配置,该配置表示消费者每次从队列中获取消息的条数,该配置为0时表示消费者通过pull方式从broker获取消息,另外不同类型的队列具有不同的默认值:

  • 持久化队列和非持久化队列的默认值为1000
  • 持久化 topic 默认值为 100
  • 非持久化topic的默认值为 Short.MAX_VALUE-1

但是仅仅只有批量获取肯定是不够的,因为从上文我们知道,消息还有一个确认过程,如果还是单个单个的确认,那这个批量获取就没有什么意义了(除了第一次是批量获取消息,后面都是单个单个的获取消息),所以ActiveMQ还提供了optimizeAcknowledge配置,该参数为true时,消费者会延迟确认(默认是ack了0.65*prefetchSize个消息后才确认)。该配置可以直接在连接url中配置(其中optimizeAcknowledgeTimeOut是表示超过该时间也会自动确认):

ConnectionFactory connectionFactory=  new w ActiveMQConnectionFactory( "tcp://192.168.0.106:61616?jms.optimizeAcknowledge=true&jms.optimizeAcknowledgeTimeOut=10000" ");

因此,这两者协同配合才能起到优化的作用。另外,需要注意的是,如果消费端的消费速度比较高,通过这两者组合是能大大提升消费者的性能。如果消费者的消费性能本身就比较慢,设置比较大的 prefetchSize 反而不能有效的达到提升消费性能的目的,因为过大的prefetchSize 会导致某一消费端积压消息,而其它的消费端却“无所事事”。同时,该方案需要消费端能够容忍重复消息,因为当消息还未确认时消费者就怠机了,那么broker就会将该消息重发给其它消费者,导致消息重复。

总结

通过以上学习,我们能看出ActiveMQ是非常简单易上手的,但它有以下缺点:

  • 持久化消息存储需要建立索引,因此吞吐量低,不适合TPS要求高的业务。
  • 不支持消息分片功能,只能自己实现。

由于消息队列产品众多,本文只是从基本概念和使用、核心机制原理以及优化等几方面对ActiveMQ做了一个概括性的引导和总结,并未涉及详细的源码分析,另具体的配置也请参照官方文档。