spring 整合 ActiveMQ

时间:2022-07-24
本文章向大家介绍spring 整合 ActiveMQ,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

1.1 JMS简介

JMS即Java Message Service,Java消息服务。主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。

点对点:一个生产者对应一个消费者;

发布/订阅模式,一个生产者对应多个消费者。

1.2 Spring整合JMS

maven:

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring-version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring-version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring-version}</version>
        </dependency>
        <dependency>
            <groupId>javax.annotation</groupId>
            <artifactId>jsr250-api</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>
</dependencies> 

1.2.1 activeMQ准备

下载activeMQ(http://activemq.apache.org/download.html),解压运行bin目录activemq.bat启动activeMQ。

1.2.2 配置ConnectionFactory

ConnectionFactory:SingleConnectionFactory和CachingConnectionFactory。

SingleConnectionFactory:单连接,忽略Connection的close方法调用。

CachingConnectionFactory:继承SingleConnectionFactory,新增缓存功能,可以缓存Session、MessageProducer和MessageConsumer。

1 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>

配置:

 1     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
 2     <bean id="mqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 3         <property name="brokerURL" value="tcp://localhost:61616"/>
 4     </bean>
 5     
 6     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
 7     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
 8         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
 9         <property name="targetConnectionFactory" ref="mqConnectionFactory"/>
10     </bean>

当使用PooledConnectionFactory连接池:

 1     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
 2     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 3         <property name="brokerURL" value="tcp://localhost:61616"/>
 4     </bean>
 5     
 6     <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
 7         <property name="connectionFactory" ref="targetConnectionFactory"/>
 8         <property name="maxConnections" value="10"/>
 9     </bean>
10     
11     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
12         <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
13     </bean>

1.2.3 配置生产者

1     <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
2     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
3         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
4         <property name="connectionFactory" ref="connectionFactory"/>
5     </bean>

队列:

 1     <!--这个是队列目的地,点对点的-->
 2     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
 3         <constructor-arg>
 4             <value>queue</value>
 5         </constructor-arg>
 6     </bean>
 7     <!--这个是主题目的地,一对多的-->
 8     <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
 9         <constructor-arg value="topic"/>
10     </bean>

生产者:

 1 package com.tiantian.springintejms.service.impl;
 2  
 3 import javax.annotation.Resource;
 4 import javax.jms.Destination;
 5 import javax.jms.JMSException;
 6 import javax.jms.Message;
 7 import javax.jms.Session;
 8  
 9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Component;
12  
13 import com.tiantian.springintejms.service.ProducerService;
14  
15 @Component
16 public class ProducerServiceImpl implements ProducerService {
17  
18     private JmsTemplate jmsTemplate;
19     
20     public void sendMessage(Destination destination, final String message) {
21         System.out.println("---------------生产者发送消息-----------------");
22         System.out.println("---------------生产者发了一个消息:" + message);
23         jmsTemplate.send(destination, new MessageCreator() {
24             public Message createMessage(Session session) throws JMSException {
25                 return session.createTextMessage(message);
26             }
27         });
28     } 
29 
30     public JmsTemplate getJmsTemplate() {
31         returnjmsTemplate;
32     } 
33 
34     @Resource
35     public void setJmsTemplate(JmsTemplate jmsTemplate) {
36         this.jmsTemplate = jmsTemplate;
37     }
38  
39 }

1.2.4 配置消费者

定义处理消息的MessageListener:实现JMS规范中的MessageListener接口

 1 package com.tiantian.springintejms.listener;
 2  
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.TextMessage;
 7  
 8 public class ConsumerMessageListener implements MessageListener {
 9  
10     public void onMessage(Message message) {
11         //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换
12         TextMessage textMsg = (TextMessage) message;
13         System.out.println("接收到一个纯文本消息。");
14         try {
15             System.out.println("消息内容是:" + textMsg.getText());
16         } catch (JMSException e) {
17             e.printStackTrace();
18         }
19     }
20  
21 }

消息监听容器配置:

 1     <!--这个是队列目的地-->
 2     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
 3         <constructor-arg>
 4             <value>queue</value>
 5         </constructor-arg>
 6     </bean>
 7     <!-- 消息监听器 -->
 8     <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>    
 9 
10     <!-- 消息监听容器 -->
11     <bean id="jmsContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
12         <property name="connectionFactory" ref="connectionFactory" />
13         <property name="destination" ref="queueDestination" />
14         <property name="messageListener" ref="consumerMessageListener" />
15     </bean>

完整配置:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
 4     xmlns:jms="http://www.springframework.org/schema/jms"
 5     xsi:schemaLocation="http://www.springframework.org/schema/beans
 6      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 7      http://www.springframework.org/schema/context
 8      http://www.springframework.org/schema/context/spring-context-3.0.xsd
 9     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
10     http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
11  
12     <context:component-scan base-package="com.tiantian" />
13  
14     <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
15     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
16         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
17         <property name="connectionFactory" ref="connectionFactory"/>
18     </bean>
19     
20     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
21     <bean id="mqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
22         <property name="brokerURL" value="tcp://localhost:61616"/>
23     </bean>
24     
25     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
26     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
27         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
28         <property name="targetConnectionFactory" ref="mqConnectionFactory"/>
29     </bean>
30     
31     <!--这个是队列目的地-->
32     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
33         <constructor-arg>
34             <value>queue</value>
35         </constructor-arg>
36     </bean>
37     <!-- 消息监听器 -->
38     <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
39     <!-- 消息监听容器 -->
40     <bean id="jmsContainer"
41         class="org.springframework.jms.listener.DefaultMessageListenerContainer">
42         <property name="connectionFactory" ref="connectionFactory" />
43         <property name="destination" ref="queueDestination" />
44         <property name="messageListener" ref="consumerMessageListener" />
45     </bean>
46 </beans>

测试:

 1 package com.tiantian.springintejms.test;
 2  
 3 import javax.jms.Destination;
 4  
 5 import org.junit.Test;
 6 import org.junit.runner.RunWith;
 7 import org.springframework.beans.factory.annotation.Autowired;
 8 import org.springframework.beans.factory.annotation.Qualifier;
 9 import org.springframework.test.context.ContextConfiguration;
10 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
11 import com.tiantian.springintejms.service.ProducerService;
12  
13 @RunWith(SpringJUnit4ClassRunner.class)
14 @ContextConfiguration("/applicationContext.xml")
15 public class ProducerConsumerTest {
16  
17     @Autowired
18     private ProducerService producerService;
19     @Autowired
20     @Qualifier("queueDestination")
21     private Destination destination;
22     
23     @Test
24     public void testSend() {
25         for (int i=0; i<2; i++) {
26             producerService.sendMessage(destination, "你好,生产者!这是消息:" + (i+1));
27         }
28     }
29     
30 }