MQ 系列之 ActiveMQ 基本使用

时间:2022-07-26
本文章向大家介绍MQ 系列之 ActiveMQ 基本使用,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

1.1 ActiveMQ 的 API

1.1.1 ConnectionFactory

  activemq-client 通过创建 ConnectionFactory 建立到 ActveMQ 的连接,连接工厂封装了一组连接配置参数,这组参数在配置 ActiveMQ 时已经定义,例如 brokerURL 参数,此参数传入的是 ActiveMQ 服务地址和端口,支持 openwire 协议的默认连接为 tcp://localhost:61616,支持 stomp 协议的默认连接为 tcp://localhost:61613。ActiveMQConnectionFactory 构造方法:   ♞ ActiveMQConnectionFactory();   ♞ ActiveMQConnectionFactory(String brokerURL);   ♞ ActiveMQConnectionFactory(String userName, String password, String brokerURL);   ♞ ActiveMQConnectionFactory(String userName, String password, URI brokerURL);   ♞ ActiveMQConnectionFactory(URI brokerURL);

1.1.2 Connection

  在成功创建正确的 ConnectionFactory 后,下一步将是创建一个连接,它是 JMS 定义的一个接口。 ConnectionFactory 的静态方法 createConnection() 可以返回与底层消息传递系统进行通信的 Connection 实现。通常客户端只使用单一连接。根据 JMS 文档可知,Connection 的目的是“利用 JMS 提供者封装开放的连接”,以及表示“客户端与提供者服务例程之间的开放 TCP/IP 套接字”。该文档还指出 Connection 应该是进行客户端身份验证的地方,除了其他一些事项外,客户端还可以指定惟一标志符。当一个 Connection 被创建时,它的传输默认是关闭的,必须使用 start 方法开启。一个 Connection 可以建立一个或多个的 Session。当一个程序执行完成后,必须使用 close() 方法关闭之前创建的 Connection,否则 ActiveMQ 不能释放资源,关闭一个 Connection 后同样也关闭了 Session、MessageProducer 和 MessageConsumer。

1.1.3 Session

  一旦从 ConnectionFactory 中获得一个 Connection,就必须从 Connection 中创建一个或者多个 Session。Session 是一个发送或接收消息的线程,可以使用 Session 创建 MessageProducer, MessageConsumer 和 Message。Session 可以被事务化,也可以不被事务化,通常,可以通过向 Connection 上的适当创建方法传递一个布尔参数对此进行设置。使用 Session createSession(boolean transacted, int acknowledgeMode); 创建 Session 对象,其中 transacted 为使用事务标识, acknowledgeMode 为签收模式。AUTO_ACKNOWLEDGE = 1:自动确认(一般选这个);CLIENT_ACKNOWLEDGE = 2:客户端手动确认; DUPS_OK_ACKNOWLEDGE = 3:自动批量确认;SESSION_TRANSACTED = 0:事务提交并确认

1.1.4 Destination

  Destination 是一个客户端用来指定生产消息目标和消费消息来源的对象。在 PTP 模式中,Destination 被称作 Queue 即队列;在 Pub/Sub 模式,Destination 被称作 Topic 即主题。在程序中可以使用多个 Queue 和 Topic。ActiveMQSession 中的方法:   ♞ Queue createQueue(String queueName);   ♞ TemporaryQueue createTemporaryQueue();   ♞ Topic createTopic(String topicName);   ♞ TemporaryTopic createTemporaryTopic();

1.1.5 Message

  JMS 程序的最终目的是生产和消费的消息能被其他程序使用,JMS 的 Message 是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非 JMS 程序格式的消息。Message 由以下几部分组成:消息头,属性和消息体。ActiveMQSession 方法:   ♞ BlobMessage createBlobMessage(File file)   ♞ BlobMessage createBlobMessage(InputStream in)   ♞ BlobMessage createBlobMessage(URL url)   ♞ BlobMessage createBlobMessage(URL url, boolean deletedByBroker)   ♞ BytesMessage createBytesMessage()   ♞ MapMessage createMapMessage()   ♞ Message createMessage()   ♞ ObjectMessage createObjectMessage()   ♞ ObjectMessage createObjectMessage(Serializable object)   ♞ TextMessage createTextMessage()   ♞ TextMessage createTextMessage(String text)

1.1.6 MessageProducer

  MessageProducer 是消息生产者,是用来发送消息的,用来设定消息的优先级,签收的模式,以及消息的存活时间的设定。ActiveMQSession 中的方法:   ♞ MessageProducer createProducer(Destination destination); ActiveMQMessageProducer 中的方法:   ♞ void send(Destination destination, Message message);   ♞ void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive);   ♞ void send(Message message);   ♞ void send(Message message, int deliveryMode, int priority, long timeToLive); 其中消息的目的地 Destination,发送的消息 Message,消息的模式 DeliveryMode,消息的优先级 priority 0-9 默认是 4,消息存活时间 timeToLive 存活时间单位是 ms 。

1.1.7 MessageConsumer

  MessageConsumer 是消息消费者,是用来接收消息的。ActiveMQSession方法:   ♞ MessageConsumer createConsumer(Destination destination);   ♞ MessageConsumer createConsumer(Destination destination, String messageSelector);   ♞ MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);   ♞ TopicSubscriber createDurableSubscriber(Topic topic, String name);   ♞ TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal);   其中 messageSelector 为消息选择器;noLocal 标志默认为 false,当设置为 true 时限制消费者只能接收和自己相同的连接(Connection)所发布的消息,此标志只适用于主题,不适用于队列;name 标识订阅主题所对应的订阅名称,持久订阅时需要设置此参数。

☞ 消息的同步和异步接收   消息的同步接收是指客户端主动去接收消息,客户端可以采用 MessageConsumer 的 receive 方法去接收下一个消息。消息的异步接收是指当消息到达时,ActiveMQ 主动通知客户端。客户端可以通过注册一个实现 MessageListener 接口的对象到 MessageConsumer。MessageListener 只有一个必须实现的方法 onMessage,它只接收一个参数,即 Message。在为每个发送到 Destination 的消息实现 onMessage 时,将调用该方法。ActiveMQMessageConsumer方法:   ♞ Message receive()   ♞ Message receive(long timeout)   ♞ Message receiveNoWait() 实现 MessageListener 接口,每当消息到达时,ActiveMQ 会调用 MessageListener 中的 onMessage 函数。

☞ 消息选择器   JMS 提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些额外开销。   消息选择器是用于 MessageConsumer 的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。按照 JMS 文档的说法,消息选择器是一些字符串,它们基于某种语法,而这种语法是 SQL-92 的子集。可以将消息选择器作为 MessageConsumer 创建的一部分。例如:public final String SELECTOR = "JMSType = 'TOPIC_PUBLISHER'";该选择器检查了传入消息的 JMSType 属性,并确定了这个属性的值是否等于 TOPIC_PUBLISHER。如果相等,则消息被消费;如果不相等,那么消息会被忽略。

1.2 点对点模式

1.2.1 概述

  点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向 ActiveMQ 发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在 ActiveMQ 服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上 ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。

1.2.2 消息生产者

☞ 添加依赖

<!-- activemq-client -->
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-client</artifactId>
	<version>5.14.5</version>
</dependency>

<!-- spring-jms -->
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-jms</artifactId>
	<version>4.3.12.RELEASE</version>
</dependency>

☞ 生产消息

/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/8/3
 * @description ActiveMQ 生产者
 */
public class ActiveMQProducer {
    public static void main(String[] args) throws JMSException {
        // 创建会话工厂对象 ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        // 创建连接对象 Connection
        Connection connection = connectionFactory.createConnection();

        // 开启连接对象
        connection.start();

        // 获得会话 Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 创建消息
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText("收到消息了吗?");

        // 指定消息发送地址和类型
        Destination destination = session.createQueue("test");

        // 创建消息发送对象
        MessageProducer producer = session.createProducer(destination);

        // 实现消息发送
        producer.send(textMessage);

        // 资源关闭
        session.close();
        connection.close();
    }
}

1.2.3 消息消费者

☞ 同步接收消息

/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/8/3
 * @description ActiveMQ 消费者
 */
public class ActiveMQProducer {
    public static void main(String[] args) throws JMSException {
        // 创建 ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        // 创建连接对象 Connection
        Connection connection = connectionFactory.createConnection();

        // 开启连接
        connection.start();

        // 创建会话对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 创建需要接收的消息地址
        Destination destination = session.createQueue("test");


        // 接收消息
        MessageConsumer consumer = session.createConsumer(destination);

        while (true){
            // 等待 10 秒,在 10 秒内一直处于接收消息状态
            Message message = consumer.receive(10000);

            if(message!=null){
                if(message instanceof  TextMessage){
                    // 将消息转换成 TextMessage
                    TextMessage textMessage = (TextMessage) message;

                    System.out.println("收到的内容是:" + textMessage.getText());
                    break;
                }
            }
        }

        // 关闭资源
        session.close();
        connection.close();
    }
}

☞ 异步接收消息

/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/8/3
 * @description ActiveMQ 消费者另外一种监听方式接收消息,该方法需要实现一个接口 MessageListener
 */
public class ActiveMQConsumer {
    public static void main(String[] args) throws Exception {
        // 创建 ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        // 创建连接对象 Connection
        Connection connection = connectionFactory.createConnection();

        // 开启连接
        connection.start();

        // 创建会话对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 创建需要接收的消息地址
        Destination destination = session.createQueue("test");


        // 接收消息
        MessageConsumer consumer = session.createConsumer(destination);

        //------------------监听创建开始-------------------
        // 监听消息,监听的方式是创建了一个线程
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    // 获取消息
                    try {
                        System.out.println("消息内容:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        // 为了防止主线程结束,这里录入键盘事件后再继续执行
        System.in.read();
        //------------------监听创建结束-------------------

        // 关闭资源
        session.close();
        connection.close();
    }
}

1.3 发布/订阅模式

1.3.1 概述

  发布/订阅消息模式是消息发送者发送消息到主题(topic),而多个消息接收者监听这个主题;其中,消息发送者和接收者分别叫做发布者(publisher)和订阅者(subscriber),对于发布者来说,它和所有的订阅者就构成了一个一对多的关系。消息生产者将消息(发布)到 topic 中,可以同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费;当生产者发布消息时,不管是否有消费者,都不会保存消息;一定要先有消息的消费者,后有消息的生产者。

1.3.2 订阅者

☞ 同步接收消息

/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/8/3
 * @description 订阅者
 */
public class ActiveMQSubscriber02 {
    public static void main(String[] args) throws Exception {
        // 创建 ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        // 创建连接对象 Connection
        Connection connection = connectionFactory.createConnection();

        // 开启连接
        connection.start();

        // 创建会话对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 创建需要接收的消息地址
        Topic topic = session.createTopic("topic");


        // 接收消息
        MessageConsumer consumer = session.createConsumer(test_topic);

        while (true){
            // 等待 10 秒,在 10 秒内一直处于接收消息状态
            Message message = consumer.receive(10000);

            if(message!=null){
                if(message instanceof  TextMessage){
                    // 将消息转换成 TextMessage
                    TextMessage textMessage = (TextMessage) message;

                    System.out.println("收到的内容是:" + textMessage.getText());
                    break;
                }
            }
        }

        // 关闭资源
        session.close();
        connection.close();
    }
}

☞ 异步接收消息

/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/8/3
 * @description 订阅者另一种订阅消息方式
 */
public class ActiveMQSubscriber01 {
    public static void main(String[] args) throws Exception {
        // 创建 ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        // 创建连接对象 Connection
        Connection connection = connectionFactory.createConnection();

        // 开启连接
        connection.start();

        // 创建会话对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 创建需要接收的消息地址
        Topic test_topic = session.createTopic("topic");


        // 接收消息
        MessageConsumer consumer = session.createConsumer(test_topic);

        //-------------关注这里开始----------------
        // 监听的方式是创建了一个线程
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(message instanceof  TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    // 获取消息
                    try {
                        System.out.println("消息是:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        // 为了防止主线程结束,这里录入键盘事件后再继续执行
        System.in.read();
        //-------------关注这里结束----------------

        // 关闭资源
        session.close();
        connection.close();
    }
}

1.3.3 发布者

/**
 * Created with IntelliJ IDEA.
 *
 * @author gaohu9712@163.com
 * @date 2020/8/3
 * @description 发布者
 */
public class ActiveMQPublisher {
    public static void main(String[] args) throws JMSException {
        // 创建会话工厂对象 ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        // 创建连接对象 Connection
        Connection connection = connectionFactory.createConnection();

        // 开启连接对象
        connection.start();

        // 获得会话 Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 创建消息
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText("发布消息");

        // 指定消息发送地址和类型
        Topic topic = session.createTopic("topic");

        // 创建消息发送对象
        MessageProducer producer = session.createProducer(topic);

        // 实现消息发送
        producer.send(textMessage);

        // 资源关闭
        session.close();
        connection.close();
    }
}