ActiveMQ—基于Java的消息传递服务器

时间:2022-07-22
本文章向大家介绍ActiveMQ—基于Java的消息传递服务器,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

ActiveMQ技术

一、介绍

ActiveMQ

ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个 完全支持 JMS(java message server)1.1 和 J2EE 1.4 规范的 JMS Provider 实现

消息

“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串; 也可以更复杂,可能包含嵌入对象。

队列

队列中的数据遵循先进先出原则

消息队列

“消息队列”是在消息的传输过程中保存消息的容器

常用的消息服务应用

1 ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线 2 RabbitMQ 是一个在 AMQP 基础上完成的,可复用的企业消息系统。 3 RocketMQ 是由阿里巴巴定义开发的一套消息队列应用服务。

二、消息服务的应用场景

消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同 时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系, 也不需要受对方的影响,即解耦合。

内聚与耦合

内聚: 标志一个模块内各个元素彼此结合的紧密程度;内聚从功能角度衡量模块内的联系,好的内聚模块应当恰好做一件事。 耦合: 是对一个软件结构内各个模块之间相互依赖程度的度量;耦合的强弱取决于模块间接口的复杂程度、进入或访问一个模块的点以及通过接口的数据。 需求的原则是:高内聚,低耦合。然而在实际需求过程中,往往会因为技术驱动,导致需求间耦合很紧,不利于后期有效地迭代开发。有效的解决办法是按流程、和业务梳理需求。

异步处理------用户注册

用户注册流程: 1)注册处理以及写数据库 2)发送注册成功的手机短信 3)发送注册成功的邮件信息

如果用消息中间件:则可以创建两个线程来做这些事情,直接发送消息给消息中间件, 然后让邮件服务和短信服务自己去消息中间件里面去取消息,然后取到消息后再自己做对应的业务操作。就是这么方便

应用解耦------订单处理

生成订单流程: 1)在购物车中点击结算 2)完成支付 3)创建订单 4)调用库存系统

订单完成后,订单系统并不去直接调用库存系统,而是发送消息到消息中间件,写入一 个订单信息。库存系统自己去消息中间件上去获取,然后做发货处理,并更新库存,这样能够实现互联网型应用追求的快这一个属性。而库存系统读取订单后库存应用这个操作也是非常快的,所以有消息中间件对解耦来说也是一个不错的方向。

流量的销峰------秒杀功能

秒杀流程: 1)用户点击秒杀 2)发送请求到秒杀应用 3)在请求秒杀应用之前将请求放入到消息队列 4)秒杀应用从消息队列中获取请求并处理。

比如,系统举行秒杀活动,热门商品。流量蜂拥而至 100 件商品,10 万人挤进来怎么 办?10 万秒杀的操作,放入消息队列。秒杀应用处理消息队列中的 10 万个请求中的前 100 个,其他的打回,通知失败。流量峰值控制在消息队列处,秒杀应用不会瞬间被挂掉.

三、JMS

介绍

JMS(Java Messaging Service)是 Java 平台上有关面向消息中间件的技术规范,它便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接 口,简化企业应用的开发。

JMS模型

点对点模型(Point to Point)

生产者发送一条消息到 queue,只有一个消费者能收到。

发布订阅模型(Publish/Subscrtbe)

发布者发送到 topic 的消息,只有订阅了 topic 的订阅者才会收到消息。

四、Activemq的安装

安装步骤

1 资源下载 ActiveMQ 官网: http://activemq.apache.org

2 版本说明

ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。 ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。 但是 JDK可以向下兼容,也就是说JDK1.8也可以使用Active5.9的版本

3 上传解压并复制 一般会复制到 /usr/local/目录下,并为其起别名 Activemq

4 查看是否有查看其bin目录的权限,如果cd bin提示权限不足,则

 chmod 755 activemq 

5 启动并测试

  启动
 ./usr/local/activemq/bin/activemq start 
 
 测试是否启动成功
 ps aux | grep activemq
 
 成功后关闭防火墙
 service  iptables stop

6 访问管理界面

使用浏览器访问 ActiveMQ 管理应用, 地址如下: http://安装activemq的主机ip:8161/admin/ 用户名: admin 密码: admin ActiveMQ 使用的是 jetty 提供 HTTP 服务.启动稍慢,建议短暂等待再访问测试. 见到如下界面代表服务启动成功

7 其他

  • 修改访问端口 修改 ActiveMQ 配置文件: /usr/local/activemq/conf/jetty.xml
  • 修改用户名以及密码 修改 conf/users.properties 配置文件.内容为: 用户名=密码 保存并重启 ActiveMQ 服务即可.

目录介绍

active目录下文件的简单介绍

  • bin 存放的是脚本文件
  • conf 存放的是基本配置文件
  • data 存放的是日志文件
  • docs 存放的是说明文档
  • examples 存放的是简单的实例
  • lib 存放的是 activemq 所需 jar 包
  • webapps 用于存放项目的目录

ActiveMQ 相关术语

  • 1 Destination 目的地,JMS Provider(消息中间件)负责维护,用于对 Message 进行管理的对象。 MessageProducer 需要指定 Destination 才能发送消息,MessageReceiver 需要指定 Destination 才能接收消息。
  • 2 Producer 消息生成者,负责发送 Message 到目的地。
  • 3 Consumer | Receiver 消息消费者,负责从目的地中消费【处理|监听|订阅】Message。
  • 4 Message 消息,消息封装一次通信的内容。

ActiveMQ 常用 API

下述 API 都是接口类型,由定义在 javax.jms 包中. 是 JMS 标准接口定义.

接口

作用

ConnectionFactory

链接工厂, 用于创建链接的工厂类型.

Connection

链接. 用于建立访问 ActiveMQ 连接的类型, 由链接工厂创建.

Session

会话, 一次持久有效有状态的访问. 由链接创建.

Destination & Queue

目的地, 用于描述本次访问 ActiveMQ 的消息访问目的地. 即 ActiveMQ 服务中的具体队 列. 由会话创建. interface Queue extends Destination

MessageProducer

消息生成者, 在一次有效会话中, 用于发送消息给 ActiveMQ 服务的工具. 由会话创建

MessageConsumer

消息消费者【消息订阅者,消息处理者】, 在一次有效会话中, 用于从 ActiveMQ 服务中 获取消息的工具. 由会话创建

Message

消息, 通过消息生成者向 ActiveMQ 服务发送消息时使用的数据载体对象或消息消费者 从 ActiveMQ 服务中获取消息时使用的数据载体对象. 是所有消息【文本消息,对象消息等】 具体类型的顶级接口. 可以通过会话创建或通过会话从 ActiveMQ 服务中获取. . .

五、简单案例

需求: 创建一个消息生产者,负责消息的产生 创建一个消息消费者,负责消息的消费

前提 1 需要安装ActiveMq应用 2 需要在Maven项目中添加相关坐标,如下

<dependency>
		    <groupId>org.apache.activemq</groupId>
		    <artifactId>activemq-all</artifactId>
		    <version>5.9.0</version>
</dependency>

点对点模型案例

简单数据类型

//消息生产者程序
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ProducerDemo {
	public static void main(String[] args) {
		ProducerDemo p = new ProducerDemo();
		p.sendHelloWorldActiveMQ("aaa");
	}
	
	/**
	 * 发送生产消息
	 */
	public void sendHelloWorldActiveMQ(String msg) {
		
		//定义连接工厂
		ConnectionFactory connectFactory=null;
		//定义连接对象
		Connection connection=null;
		//定义会话
		Session session=null;
		//定义目的地
		Destination destination=null;
		//定义消息发送者
		MessageProducer messageProducer=null;
		//定义消息
		Message message=null;
		
		try {
			/**
			 * 创建连接工厂对象
			 * userName:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
			 * password:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
			 * brokerURL:访问ActiveMQ服务的路径地址。路径结构为:协议名://主机地址:端口号
			 */
			connectFactory =new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.179.131:61616");
			//创建连接对象
			connection=connectFactory.createConnection();
			//开启连接
			connection.start();
			
			/**
			 * transacted:是否使用事务 可选值为:true|false
			 *            true:使用事务 当设置次变量值。Session.SESSION_TRANSACTED
			 *            false:不适用事务,设置次变量 则acknowledgeMode参数必须设置
			 * acknowledgeMode:
			 * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
			 * Session.CLIENT_ACKNOWLEDGE:客户端确认机制
			 * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
			 */
			session=connection.createSession(false, session.AUTO_ACKNOWLEDGE);
			//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列 
			destination=session.createQueue("helloworld-destination");
			//创建消息生产者
			messageProducer=session.createProducer(destination);
			//创建消息对象
			message=session.createTextMessage(msg);
			//发送消息
			messageProducer.send(message);
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
			//回收消息发送者资源
			if (messageProducer !=null) {
				try {
					messageProducer.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			
			if (session !=null) {
				try {
					session.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			
			if (connection !=null) {
				try {
					connection.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
	}
}
//消息消费者程序
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ProducerDemo {
	public static void main(String[] args) {
		ProducerDemo p = new ProducerDemo();
		p.sendHelloWorldActiveMQ("aaa");
	}
	
	/**
	 * 发送生产消息
	 */
	public void sendHelloWorldActiveMQ(String msg) {
		
		//定义连接工厂
		ConnectionFactory connectFactory=null;
		//定义连接对象
		Connection connection=null;
		//定义会话
		Session session=null;
		//定义目的地
		Destination destination=null;
		//定义消息发送者
		MessageProducer messageProducer=null;
		//定义消息
		Message message=null;
		
		try {
			/**
			 * 创建连接工厂对象
			 * userName:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
			 * password:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
			 * brokerURL:访问ActiveMQ服务的路径地址。路径结构为:协议名://主机地址:端口号
			 */
			connectFactory =new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.179.131:61616");
			//创建连接对象
			connection=connectFactory.createConnection();
			//开启连接
			connection.start();
			
			/**
			 * transacted:是否使用事务 可选值为:true|false
			 *            true:使用事务 当设置次变量值。Session.SESSION_TRANSACTED
			 *            false:不适用事务,设置次变量 则acknowledgeMode参数必须设置
			 * acknowledgeMode:
			 * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
			 * Session.CLIENT_ACKNOWLEDGE:客户端确认机制
			 * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
			 */
			session=connection.createSession(false, session.AUTO_ACKNOWLEDGE);
			//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列 
			destination=session.createQueue("helloworld-destination");
			//创建消息生产者
			messageProducer=session.createProducer(destination);
			//创建消息对象
			message=session.createTextMessage(msg);
			//发送消息
			messageProducer.send(message);
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
			//回收消息发送者资源
			if (messageProducer !=null) {
				try {
					messageProducer.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			
			if (session !=null) {
				try {
					session.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			
			if (connection !=null) {
				try {
					connection.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
	}
}

对象数据类型

公有实体类Users,属性如下,并添加带参无参,取值赋值,toString()方法:

	private int userid;
	private String username;
	private int userage;
public class ProducerDemo2 {
	public static void main(String[] args) {
		ProducerDemo2 p = new ProducerDemo2();
		Users users=new Users();
		users.setUserid(1);
		users.setUsername("chy");
		users.setUserage(24);
		p.sendHelloWorldActiveMQ(users);
	}
	
	/**
	 * 发送生产消息
	 */
	public void sendHelloWorldActiveMQ(Users users) {
		
		//定义连接工厂
		ConnectionFactory connectFactory=null;
		//定义连接对象
		Connection connection=null;
		//定义会话
		Session session=null;
		//定义目的地
		Destination destination=null;
		//定义消息发送者
		MessageProducer messageProducer=null;
		//定义消息
		Message message=null;
		
		try {
			/**
			 * 创建连接工厂对象
			 * userName:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
			 * password:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
			 * brokerURL:访问ActiveMQ服务的路径地址。路径结构为:协议名://主机地址:端口号
			 */
			connectFactory =new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.179.131:61616");
			//创建连接对象
			connection=connectFactory.createConnection();
			//开启连接
			connection.start();
			
			/**
			 * transacted:是否使用事务 可选值为:true|false
			 *            true:使用事务 当设置次变量值。Session.SESSION_TRANSACTED
			 *            false:不适用事务,设置次变量 则acknowledgeMode参数必须设置
			 * acknowledgeMode:
			 * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
			 * Session.CLIENT_ACKNOWLEDGE:客户端确认机制
			 * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
			 */
			session=connection.createSession(false, session.AUTO_ACKNOWLEDGE);
			//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列 
			destination=session.createQueue("my-users");
			//创建消息生产者
			messageProducer=session.createProducer(destination);
			//创建消息对象(对象类型)
			message=session.createObjectMessage(users);
			
			
			//发送消息
			messageProducer.send(message);
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
			//回收消息发送者资源
			if (messageProducer !=null) {
				try {
					messageProducer.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			
			if (session !=null) {
				try {
					session.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			
			if (connection !=null) {
				try {
					connection.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
	}
}
public class ConsumerDemo2 {
	public static void main(String[] args) {
		ConsumerDemo2 c2 = new ConsumerDemo2();
		c2.readHelloWorldActiveMQ();
	}
	
	
	/**
	 * 消费消息
	 */
	public void readHelloWorldActiveMQ() {
		//定义连接工厂
				ConnectionFactory connectFactory=null;
				
				//定义连接对象
				Connection connection=null;
				//定义会话
				Session session=null;
				//定义目的地
				Destination destination=null;
				//定义消息消费者
				MessageConsumer messageConsumer=null;
				
				//定义消息
				Message message=null;
				
				try {
					/**
					 * 创建连接工厂对象
					 * userName:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
					 * password:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
					 * brokerURL:访问ActiveMQ服务的路径地址。路径结构为:协议名://主机地址:端口号
					 */
					connectFactory =new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.179.131:61616");
					//创建连接对象
					connection=connectFactory.createConnection();
					//开启连接
					connection.start();
					
					/**
					 * transacted:是否使用事务 可选值为:true|false
					 *            true:使用事务 当设置次变量值。Session.SESSION_TRANSACTED
					 *            false:不适用事务,设置次变量 则acknowledgeMode参数必须设置
					 * acknowledgeMode:
					 * Session.AUTO_ACKNOWLEDGE:自动消息确认机制
					 * Session.CLIENT_ACKNOWLEDGE:客户端确认机制
					 * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
					 */
					session=connection.createSession(false, session.AUTO_ACKNOWLEDGE);
					//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列 
					destination=session.createQueue("my-users");
					//创建消息生产者
					messageConsumer=session.createConsumer(destination);
					//接收消息
					message=messageConsumer.receive();
					//处理消息
					ObjectMessage objMessage =(ObjectMessage)message; 
					Users users=(Users) objMessage.getObject();
					
				    System.out.println("从 ActiveMQ 服务中获取的文本信息 "+users); 
				} catch (Exception e) {
					e.printStackTrace();
				}finally {
					//回收消息发送者资源
					if (messageConsumer !=null) {
						try {
							messageConsumer.close();
						} catch (Exception e) {
							e.printStackTrace();
						}
					}
					
					if (session !=null) {
						try {
							session.close();
						} catch (Exception e) {
							e.printStackTrace();
						}
					}
					
					if (connection !=null) {
						try {
							connection.close();
						} catch (Exception e) {
							e.printStackTrace();
						}
					}
				}
	}
}

发布订阅模型

消息生产者相比上述代码只修改一条属性,即创建目的地的方法

//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列 
			//1. 点对点模式
			//destination=session.createQueue("helloworld-destination");
			//2. 发布订阅模式
			destination=session.createTopic("Demo-topic");

消费者模式需要修改: 1 同上 2 继承Runnable接口, 实现run方法, 并复制两个这样的消费者线程 3 创建测试类,创建三个线程,并启动这三个线程 4 打开生产者的测试类,就可以发现屏幕上打印三个结果了

测试方法
public class TestTopic extends Thread{
	
	public static void main(String[] args) {
		ConsumerTopicDemo1 consumerTopicDemo1 = new ConsumerTopicDemo1();
		ConsumerTopicDemo2 consumerTopicDemo2 = new ConsumerTopicDemo2();
		ConsumerTopicDemo3 consumerTopicDemo3 = new ConsumerTopicDemo3();
		
		Thread t1=new Thread(consumerTopicDemo1);
		t1.start();
		Thread t2=new Thread(consumerTopicDemo2);
		t2.start();
		Thread t3=new Thread(consumerTopicDemo3);
		t3.start();
	}
}

SpringMVC整合ActiveMq

1. Spring整合ActiveMQ-创建生产者

(1) Spring整合ActiveMQ创建消息生产者时需要添加哪些依赖?

  <!-- ActiveMQ客户端完整jar包依赖 -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
		</dependency>
		<!-- ActiveMQ和Spring整合配置文件标签处理jar包依赖 -->
		<dependency>
			<groupId>org.apache.xbean</groupId>
			<artifactId>xbean-spring</artifactId>
		</dependency>
		<!-- Spring-JMS插件相关jar包依赖 -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
		</dependency>
			<dependency>
		    <groupId>org.apache.activemq</groupId>
		    <artifactId>activemq-pool</artifactId>
		</dependency>
		<dependency>
		    <groupId>org.apache.activemq</groupId>
		    <artifactId>activemq-jms-pool</artifactId>
		</dependency>

(2) 如何在Spring中配置消息生产者?写出步骤

  1. 搭建好SpringMVC的基础环境
  2. 创建JMS的配置文件
  3. 在控制器创建JmsTemplate的对象jmsTempate

步骤二代码 applicationContext-jms.xml

 <!-- 需要创建一个连接工厂,连接ActiveMQ. ActiveMQConnectionFactory. 需要依赖ActiveMQ提供的amq标签 -->
	<!-- amq:connectionFactory 是bean标签的子标签, 会在spring容器中创建一个bean对象. 可以为对象命名. 
		类似: <bean id="" class="ActiveMQConnectionFactory"></bean> -->
	<amq:connectionFactory brokerURL="tcp://192.168.179.131:61616"
		userName="admin" password="admin" id="amqConnectionFactory" />
	
	<!-- 配置池化的ConnectionFactory。 为连接ActiveMQ的connectionFactory提供连接池 -->
	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
		<property name="connectionFactory" ref="amqConnectionFactory"></property>
		<property name="maxConnections" value="10"></property>
	</bean>
	<!-- spring管理JMS相关代码的时候,必须依赖jms标签库. spring-jms提供的标签库. -->
	<!-- 定义Spring-JMS中的连接工厂对象 CachingConnectionFactory - spring框架提供的连接工厂对象. 
		不能真正的访问MOM容器. 类似一个工厂的代理对象. 需要提供一个真实工厂,实现MOM容器的连接访问. -->
	<!-- 配置有缓存的ConnectionFactory,session的缓存大小可定制。 -->
	<bean id="connectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
		<property name="sessionCacheSize" value="3"></property>
	</bean>

	<!-- JmsTemplate配置 -->
	<bean id="template" class="org.springframework.jms.core.JmsTemplate">
		<!-- 给定连接工厂, 必须是spring创建的连接工厂. -->
		<property name="connectionFactory" ref="connectionFactory"></property>
		<!-- 可选 - 默认目的地命名 -->
		<property name="defaultDestinationName" value="test-spring"></property>
	</bean>

步骤三代码

   @Service
public class UserServiceImpl implements UserService{
	
	@Autowired
	private JmsTemplate jmsTempate;
	
	@Override
	public void addUser(final Users users) {
		//设置目的地名称
		//this.jmsTempate.setDefaultDestinationName("spring-test2");
		
		//发送消息
		this.jmsTempate.send(new MessageCreator() {	
			
			@Override
			public Message createMessage(Session session) throws JMSException {
				Message message = session.createObjectMessage(users);
				return message;
			}
		});
	}
	
}

2. Spring整合ActiveMQ-创建消费者

(1) Spring整合ActiveMQ创建消息消费者时需要添加哪些依赖?

 	<!-- ActiveMQ客户端完整jar包依赖 -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
		</dependency>
		<!-- ActiveMQ和Spring整合配置文件标签处理jar包依赖 -->
		<dependency>
			<groupId>org.apache.xbean</groupId>
			<artifactId>xbean-spring</artifactId>
		</dependency>
		<!-- Spring-JMS插件相关jar包依赖 -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
		</dependency>
			<dependency>
		    <groupId>org.apache.activemq</groupId>
		    <artifactId>activemq-pool</artifactId>
		</dependency>
		<dependency>
		    <groupId>org.apache.activemq</groupId>
		    <artifactId>activemq-jms-pool</artifactId>
		</dependency>

(2) 如何在Spring中配置消息消费者?写出步骤 1. 搭建好SpringMVC的基础环境 2. 创建JMS的配置文件 3. 在控制器创建监听器实现MessageListener接口,然后再配置文件中注册( @Component(value=“myListener”))

applicationContext-jms.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:jms="http://www.springframework.org/schema/jms" 
	xmlns:amq="http://activemq.apache.org/schema/core"
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/jms
		http://www.springframework.org/schema/jms/spring-jms.xsd
		http://activemq.apache.org/schema/core
		http://activemq.apache.org/schema/core/activemq-core.xsd">
	<!-- 需要创建一个连接工厂,连接ActiveMQ. ActiveMQConnectionFactory. 需要依赖ActiveMQ提供的amq标签 -->
	<!-- amq:connectionFactory 是bean标签的子标签, 会在spring容器中创建一个bean对象.
		可以为对象命名. 类似: <bean id="" class="ActiveMQConnectionFactory"></bean>
	 -->
	<amq:connectionFactory brokerURL="tcp://192.168.179.131:61616"
		userName="admin" password="admin" id="amqConnectionFactory"/>

	<!-- spring管理JMS相关代码的时候,必须依赖jms标签库. spring-jms提供的标签库. -->
	<!-- 定义Spring-JMS中的连接工厂对象
		CachingConnectionFactory - spring框架提供的连接工厂对象. 不能真正的访问MOM容器.
			类似一个工厂的代理对象. 需要提供一个真实工厂,实现MOM容器的连接访问.
	 -->
	<bean id="connectionFactory" 
		class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
		<property name="sessionCacheSize" value="3"></property>
	</bean>
	
	<!-- 注册监听器 -->
	<!-- 开始注册监听. 
		需要的参数有:
			acknowledge - 消息确认机制
			container-type - 容器类型 default|simple
			simple:SimpleMessageListenerContainer最简单的消息监听器容器,只能处理固定数量的JMS会话,且不支持事务。
			default:DefaultMessageListenerContainer是一个用于异步消息监听器容器 ,且支持事务         
			destination-type - 目的地类型. 使用队列作为目的地.
			connection-factory - 连接工厂, spring-jms使用的连接工厂,必须是spring自主创建的
			不能使用三方工具创建的工程. 如: ActiveMQConnectionFactory.
	 -->
	<jms:listener-container acknowledge="auto" container-type="default"
		destination-type="queue" connection-factory="connectionFactory" >
		<!-- 在监听器容器中注册某监听器对象.
			destination - 设置目的地命名
			ref - 指定监听器对象
		 -->
		<jms:listener destination="test-spring" ref="myListener"/>
	</jms:listener-container>
	
</beans>

消息监听器

 MyListener.class
 @Component(value="myListener")
public class MyListener implements MessageListener{
	
	@Autowired
	private UserService userService;
	@Override
	public void onMessage(Message message) {
		//处理消息
		ObjectMessage objMessage =(ObjectMessage)message; 
		Users users=null;
		try {
			users = (Users) objMessage.getObject();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		this.userService.showUser(users);
	}
	
}