spring 整合 ActiveMQ
时间:2022-07-24
本文章向大家介绍spring 整合 ActiveMQ,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
1.1 JMS简介
JMS即Java Message Service,Java消息服务。主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。
点对点:一个生产者对应一个消费者;
发布/订阅模式,一个生产者对应多个消费者。
1.2 Spring整合JMS
maven:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>jsr250-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
</dependencies>
1.2.1 activeMQ准备
下载activeMQ(http://activemq.apache.org/download.html),解压运行bin目录activemq.bat启动activeMQ。
1.2.2 配置ConnectionFactory
ConnectionFactory:SingleConnectionFactory和CachingConnectionFactory。
SingleConnectionFactory:单连接,忽略Connection的close方法调用。
CachingConnectionFactory:继承SingleConnectionFactory,新增缓存功能,可以缓存Session、MessageProducer和MessageConsumer。
1 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>
配置:
1 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
2 <bean id="mqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
3 <property name="brokerURL" value="tcp://localhost:61616"/>
4 </bean>
5
6 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
7 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
8 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
9 <property name="targetConnectionFactory" ref="mqConnectionFactory"/>
10 </bean>
当使用PooledConnectionFactory连接池:
1 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
2 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
3 <property name="brokerURL" value="tcp://localhost:61616"/>
4 </bean>
5
6 <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
7 <property name="connectionFactory" ref="targetConnectionFactory"/>
8 <property name="maxConnections" value="10"/>
9 </bean>
10
11 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
12 <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
13 </bean>
1.2.3 配置生产者
1 <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
2 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
3 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
4 <property name="connectionFactory" ref="connectionFactory"/>
5 </bean>
队列:
1 <!--这个是队列目的地,点对点的-->
2 <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
3 <constructor-arg>
4 <value>queue</value>
5 </constructor-arg>
6 </bean>
7 <!--这个是主题目的地,一对多的-->
8 <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
9 <constructor-arg value="topic"/>
10 </bean>
生产者:
1 package com.tiantian.springintejms.service.impl;
2
3 import javax.annotation.Resource;
4 import javax.jms.Destination;
5 import javax.jms.JMSException;
6 import javax.jms.Message;
7 import javax.jms.Session;
8
9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Component;
12
13 import com.tiantian.springintejms.service.ProducerService;
14
15 @Component
16 public class ProducerServiceImpl implements ProducerService {
17
18 private JmsTemplate jmsTemplate;
19
20 public void sendMessage(Destination destination, final String message) {
21 System.out.println("---------------生产者发送消息-----------------");
22 System.out.println("---------------生产者发了一个消息:" + message);
23 jmsTemplate.send(destination, new MessageCreator() {
24 public Message createMessage(Session session) throws JMSException {
25 return session.createTextMessage(message);
26 }
27 });
28 }
29
30 public JmsTemplate getJmsTemplate() {
31 returnjmsTemplate;
32 }
33
34 @Resource
35 public void setJmsTemplate(JmsTemplate jmsTemplate) {
36 this.jmsTemplate = jmsTemplate;
37 }
38
39 }
1.2.4 配置消费者
定义处理消息的MessageListener:实现JMS规范中的MessageListener接口
1 package com.tiantian.springintejms.listener;
2
3 import javax.jms.JMSException;
4 import javax.jms.Message;
5 import javax.jms.MessageListener;
6 import javax.jms.TextMessage;
7
8 public class ConsumerMessageListener implements MessageListener {
9
10 public void onMessage(Message message) {
11 //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换
12 TextMessage textMsg = (TextMessage) message;
13 System.out.println("接收到一个纯文本消息。");
14 try {
15 System.out.println("消息内容是:" + textMsg.getText());
16 } catch (JMSException e) {
17 e.printStackTrace();
18 }
19 }
20
21 }
消息监听容器配置:
1 <!--这个是队列目的地-->
2 <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
3 <constructor-arg>
4 <value>queue</value>
5 </constructor-arg>
6 </bean>
7 <!-- 消息监听器 -->
8 <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
9
10 <!-- 消息监听容器 -->
11 <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
12 <property name="connectionFactory" ref="connectionFactory" />
13 <property name="destination" ref="queueDestination" />
14 <property name="messageListener" ref="consumerMessageListener" />
15 </bean>
完整配置:
1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:jms="http://www.springframework.org/schema/jms"
5 xsi:schemaLocation="http://www.springframework.org/schema/beans
6 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
7 http://www.springframework.org/schema/context
8 http://www.springframework.org/schema/context/spring-context-3.0.xsd
9 http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
10 http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
11
12 <context:component-scan base-package="com.tiantian" />
13
14 <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
15 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
16 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
17 <property name="connectionFactory" ref="connectionFactory"/>
18 </bean>
19
20 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
21 <bean id="mqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
22 <property name="brokerURL" value="tcp://localhost:61616"/>
23 </bean>
24
25 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
26 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
27 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
28 <property name="targetConnectionFactory" ref="mqConnectionFactory"/>
29 </bean>
30
31 <!--这个是队列目的地-->
32 <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
33 <constructor-arg>
34 <value>queue</value>
35 </constructor-arg>
36 </bean>
37 <!-- 消息监听器 -->
38 <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
39 <!-- 消息监听容器 -->
40 <bean id="jmsContainer"
41 class="org.springframework.jms.listener.DefaultMessageListenerContainer">
42 <property name="connectionFactory" ref="connectionFactory" />
43 <property name="destination" ref="queueDestination" />
44 <property name="messageListener" ref="consumerMessageListener" />
45 </bean>
46 </beans>
测试:
1 package com.tiantian.springintejms.test;
2
3 import javax.jms.Destination;
4
5 import org.junit.Test;
6 import org.junit.runner.RunWith;
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.beans.factory.annotation.Qualifier;
9 import org.springframework.test.context.ContextConfiguration;
10 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
11 import com.tiantian.springintejms.service.ProducerService;
12
13 @RunWith(SpringJUnit4ClassRunner.class)
14 @ContextConfiguration("/applicationContext.xml")
15 public class ProducerConsumerTest {
16
17 @Autowired
18 private ProducerService producerService;
19 @Autowired
20 @Qualifier("queueDestination")
21 private Destination destination;
22
23 @Test
24 public void testSend() {
25 for (int i=0; i<2; i++) {
26 producerService.sendMessage(destination, "你好,生产者!这是消息:" + (i+1));
27 }
28 }
29
30 }
- 教你 Debug 的正确姿势——记一次 CoreMotion 的 Crash
- Linux系统yum命令安装软件时保留(下载)rpm包
- Go语言读写数据库
- 《Android 创建线程源码与OOM分析》
- 微信 Android 视频编码爬过的那些坑
- 少年,这有套《街霸2》AI速成心法,想传授于你……
- 你知道android的MessageQueue.IdleHandler吗?
- 《Android基础:Fragment,看这篇就够了》
- Android 7.0中ContentProvider实现原理
- 《iOS APP 性能检测》
- iOS 11 安全区域适配总结
- Linux下巧用chattr、watch命令的实例
- 【特斯拉组件】iOS高性能PageController
- SUSE Linux系统在线安装软件命令zypper参数详解
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- 深入理解JavaScript闭包之闭包的使用场景
- Spring Boot 到底是个啥?
- Spring Boot 整合 Thymeleaf
- webapp打包为Android的apk包的一种方法
- Android应用之Hybird混合开发,集成web页面的方法尝试
- Spring Boot 通过 XML 的方式整合 MyBatis
- layUI登录界面验证码功能模块儿封装
- go语言微信公众号开发后台接口封装
- 【DB宝14】在Docker中只需2步即可拥有Oracle 11g企业版环境(11.2.0.4)
- 别忘了给gcc编译器工具链加上-fno-common选项
- 轻量安全的部署方案
- 算法集锦(34) | 强化学习| 出租车载客问题
- 前端测试题:(解析)关于ajax跨域的说法,下面错误的是?
- 什么才是定制化 IDE 的核心价值?
- RTOS内功修炼记(八)— CMSIS RTOS API,内核通用API接口