ActiveMQ—基于Java的消息传递服务器
ActiveMQ技术
一、介绍
ActiveMQ
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个 完全支持 JMS(java message server)1.1 和 J2EE 1.4 规范的 JMS Provider 实现
消息
“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串; 也可以更复杂,可能包含嵌入对象。
队列
队列中的数据遵循先进先出原则
消息队列
“消息队列”是在消息的传输过程中保存消息的容器
常用的消息服务应用
1 ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线 2 RabbitMQ 是一个在 AMQP 基础上完成的,可复用的企业消息系统。 3 RocketMQ 是由阿里巴巴定义开发的一套消息队列应用服务。
二、消息服务的应用场景
消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同 时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系, 也不需要受对方的影响,即解耦合。
内聚与耦合
内聚: 标志一个模块内各个元素彼此结合的紧密程度;内聚从功能角度衡量模块内的联系,好的内聚模块应当恰好做一件事。 耦合: 是对一个软件结构内各个模块之间相互依赖程度的度量;耦合的强弱取决于模块间接口的复杂程度、进入或访问一个模块的点以及通过接口的数据。 需求的原则是:高内聚,低耦合。然而在实际需求过程中,往往会因为技术驱动,导致需求间耦合很紧,不利于后期有效地迭代开发。有效的解决办法是按流程、和业务梳理需求。
异步处理------用户注册
用户注册流程: 1)注册处理以及写数据库 2)发送注册成功的手机短信 3)发送注册成功的邮件信息
如果用消息中间件:则可以创建两个线程来做这些事情,直接发送消息给消息中间件, 然后让邮件服务和短信服务自己去消息中间件里面去取消息,然后取到消息后再自己做对应的业务操作。就是这么方便
应用解耦------订单处理
生成订单流程: 1)在购物车中点击结算 2)完成支付 3)创建订单 4)调用库存系统
订单完成后,订单系统并不去直接调用库存系统,而是发送消息到消息中间件,写入一 个订单信息。库存系统自己去消息中间件上去获取,然后做发货处理,并更新库存,这样能够实现互联网型应用追求的快这一个属性。而库存系统读取订单后库存应用这个操作也是非常快的,所以有消息中间件对解耦来说也是一个不错的方向。
流量的销峰------秒杀功能
秒杀流程: 1)用户点击秒杀 2)发送请求到秒杀应用 3)在请求秒杀应用之前将请求放入到消息队列 4)秒杀应用从消息队列中获取请求并处理。
比如,系统举行秒杀活动,热门商品。流量蜂拥而至 100 件商品,10 万人挤进来怎么 办?10 万秒杀的操作,放入消息队列。秒杀应用处理消息队列中的 10 万个请求中的前 100 个,其他的打回,通知失败。流量峰值控制在消息队列处,秒杀应用不会瞬间被挂掉.
三、JMS
介绍
JMS(Java Messaging Service)是 Java 平台上有关面向消息中间件的技术规范,它便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接 口,简化企业应用的开发。
JMS模型
点对点模型(Point to Point)
生产者发送一条消息到 queue,只有一个消费者能收到。
发布订阅模型(Publish/Subscrtbe)
发布者发送到 topic 的消息,只有订阅了 topic 的订阅者才会收到消息。
四、Activemq的安装
安装步骤
1 资源下载 ActiveMQ 官网: http://activemq.apache.org
2 版本说明
ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。 ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。 但是 JDK可以向下兼容,也就是说JDK1.8也可以使用Active5.9的版本
3 上传解压并复制 一般会复制到 /usr/local/目录下,并为其起别名 Activemq
4 查看是否有查看其bin目录的权限,如果cd bin提示权限不足,则
chmod 755 activemq
5 启动并测试
启动
./usr/local/activemq/bin/activemq start
测试是否启动成功
ps aux | grep activemq
成功后关闭防火墙
service iptables stop
6 访问管理界面
使用浏览器访问 ActiveMQ 管理应用, 地址如下: http://安装activemq的主机ip:8161/admin/ 用户名: admin 密码: admin ActiveMQ 使用的是 jetty 提供 HTTP 服务.启动稍慢,建议短暂等待再访问测试. 见到如下界面代表服务启动成功
7 其他
- 修改访问端口 修改 ActiveMQ 配置文件: /usr/local/activemq/conf/jetty.xml
- 修改用户名以及密码 修改 conf/users.properties 配置文件.内容为: 用户名=密码 保存并重启 ActiveMQ 服务即可.
目录介绍
active目录下文件的简单介绍
- bin 存放的是脚本文件
- conf 存放的是基本配置文件
- data 存放的是日志文件
- docs 存放的是说明文档
- examples 存放的是简单的实例
- lib 存放的是 activemq 所需 jar 包
- webapps 用于存放项目的目录
ActiveMQ 相关术语
- 1 Destination 目的地,JMS Provider(消息中间件)负责维护,用于对 Message 进行管理的对象。 MessageProducer 需要指定 Destination 才能发送消息,MessageReceiver 需要指定 Destination 才能接收消息。
- 2 Producer 消息生成者,负责发送 Message 到目的地。
- 3 Consumer | Receiver 消息消费者,负责从目的地中消费【处理|监听|订阅】Message。
- 4 Message 消息,消息封装一次通信的内容。
ActiveMQ 常用 API
下述 API 都是接口类型,由定义在 javax.jms 包中. 是 JMS 标准接口定义.
接口 |
作用 |
---|---|
ConnectionFactory |
链接工厂, 用于创建链接的工厂类型. |
Connection |
链接. 用于建立访问 ActiveMQ 连接的类型, 由链接工厂创建. |
Session |
会话, 一次持久有效有状态的访问. 由链接创建. |
Destination & Queue |
目的地, 用于描述本次访问 ActiveMQ 的消息访问目的地. 即 ActiveMQ 服务中的具体队 列. 由会话创建. interface Queue extends Destination |
MessageProducer |
消息生成者, 在一次有效会话中, 用于发送消息给 ActiveMQ 服务的工具. 由会话创建 |
MessageConsumer |
消息消费者【消息订阅者,消息处理者】, 在一次有效会话中, 用于从 ActiveMQ 服务中 获取消息的工具. 由会话创建 |
Message |
消息, 通过消息生成者向 ActiveMQ 服务发送消息时使用的数据载体对象或消息消费者 从 ActiveMQ 服务中获取消息时使用的数据载体对象. 是所有消息【文本消息,对象消息等】 具体类型的顶级接口. 可以通过会话创建或通过会话从 ActiveMQ 服务中获取. . . |
五、简单案例
需求: 创建一个消息生产者,负责消息的产生 创建一个消息消费者,负责消息的消费
前提 1 需要安装ActiveMq应用 2 需要在Maven项目中添加相关坐标,如下
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
点对点模型案例
简单数据类型
//消息生产者程序
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ProducerDemo {
public static void main(String[] args) {
ProducerDemo p = new ProducerDemo();
p.sendHelloWorldActiveMQ("aaa");
}
/**
* 发送生产消息
*/
public void sendHelloWorldActiveMQ(String msg) {
//定义连接工厂
ConnectionFactory connectFactory=null;
//定义连接对象
Connection connection=null;
//定义会话
Session session=null;
//定义目的地
Destination destination=null;
//定义消息发送者
MessageProducer messageProducer=null;
//定义消息
Message message=null;
try {
/**
* 创建连接工厂对象
* userName:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
* password:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
* brokerURL:访问ActiveMQ服务的路径地址。路径结构为:协议名://主机地址:端口号
*/
connectFactory =new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.179.131:61616");
//创建连接对象
connection=connectFactory.createConnection();
//开启连接
connection.start();
/**
* transacted:是否使用事务 可选值为:true|false
* true:使用事务 当设置次变量值。Session.SESSION_TRANSACTED
* false:不适用事务,设置次变量 则acknowledgeMode参数必须设置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动消息确认机制
* Session.CLIENT_ACKNOWLEDGE:客户端确认机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
*/
session=connection.createSession(false, session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination=session.createQueue("helloworld-destination");
//创建消息生产者
messageProducer=session.createProducer(destination);
//创建消息对象
message=session.createTextMessage(msg);
//发送消息
messageProducer.send(message);
} catch (Exception e) {
e.printStackTrace();
}finally {
//回收消息发送者资源
if (messageProducer !=null) {
try {
messageProducer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (session !=null) {
try {
session.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (connection !=null) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
//消息消费者程序
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ProducerDemo {
public static void main(String[] args) {
ProducerDemo p = new ProducerDemo();
p.sendHelloWorldActiveMQ("aaa");
}
/**
* 发送生产消息
*/
public void sendHelloWorldActiveMQ(String msg) {
//定义连接工厂
ConnectionFactory connectFactory=null;
//定义连接对象
Connection connection=null;
//定义会话
Session session=null;
//定义目的地
Destination destination=null;
//定义消息发送者
MessageProducer messageProducer=null;
//定义消息
Message message=null;
try {
/**
* 创建连接工厂对象
* userName:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
* password:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
* brokerURL:访问ActiveMQ服务的路径地址。路径结构为:协议名://主机地址:端口号
*/
connectFactory =new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.179.131:61616");
//创建连接对象
connection=connectFactory.createConnection();
//开启连接
connection.start();
/**
* transacted:是否使用事务 可选值为:true|false
* true:使用事务 当设置次变量值。Session.SESSION_TRANSACTED
* false:不适用事务,设置次变量 则acknowledgeMode参数必须设置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动消息确认机制
* Session.CLIENT_ACKNOWLEDGE:客户端确认机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
*/
session=connection.createSession(false, session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination=session.createQueue("helloworld-destination");
//创建消息生产者
messageProducer=session.createProducer(destination);
//创建消息对象
message=session.createTextMessage(msg);
//发送消息
messageProducer.send(message);
} catch (Exception e) {
e.printStackTrace();
}finally {
//回收消息发送者资源
if (messageProducer !=null) {
try {
messageProducer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (session !=null) {
try {
session.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (connection !=null) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
对象数据类型
公有实体类Users,属性如下,并添加带参无参,取值赋值,toString()方法:
private int userid;
private String username;
private int userage;
public class ProducerDemo2 {
public static void main(String[] args) {
ProducerDemo2 p = new ProducerDemo2();
Users users=new Users();
users.setUserid(1);
users.setUsername("chy");
users.setUserage(24);
p.sendHelloWorldActiveMQ(users);
}
/**
* 发送生产消息
*/
public void sendHelloWorldActiveMQ(Users users) {
//定义连接工厂
ConnectionFactory connectFactory=null;
//定义连接对象
Connection connection=null;
//定义会话
Session session=null;
//定义目的地
Destination destination=null;
//定义消息发送者
MessageProducer messageProducer=null;
//定义消息
Message message=null;
try {
/**
* 创建连接工厂对象
* userName:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
* password:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
* brokerURL:访问ActiveMQ服务的路径地址。路径结构为:协议名://主机地址:端口号
*/
connectFactory =new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.179.131:61616");
//创建连接对象
connection=connectFactory.createConnection();
//开启连接
connection.start();
/**
* transacted:是否使用事务 可选值为:true|false
* true:使用事务 当设置次变量值。Session.SESSION_TRANSACTED
* false:不适用事务,设置次变量 则acknowledgeMode参数必须设置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动消息确认机制
* Session.CLIENT_ACKNOWLEDGE:客户端确认机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
*/
session=connection.createSession(false, session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination=session.createQueue("my-users");
//创建消息生产者
messageProducer=session.createProducer(destination);
//创建消息对象(对象类型)
message=session.createObjectMessage(users);
//发送消息
messageProducer.send(message);
} catch (Exception e) {
e.printStackTrace();
}finally {
//回收消息发送者资源
if (messageProducer !=null) {
try {
messageProducer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (session !=null) {
try {
session.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (connection !=null) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
public class ConsumerDemo2 {
public static void main(String[] args) {
ConsumerDemo2 c2 = new ConsumerDemo2();
c2.readHelloWorldActiveMQ();
}
/**
* 消费消息
*/
public void readHelloWorldActiveMQ() {
//定义连接工厂
ConnectionFactory connectFactory=null;
//定义连接对象
Connection connection=null;
//定义会话
Session session=null;
//定义目的地
Destination destination=null;
//定义消息消费者
MessageConsumer messageConsumer=null;
//定义消息
Message message=null;
try {
/**
* 创建连接工厂对象
* userName:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
* password:访问ActiveMQ服务的用户名。用户密码。默认的为admin。用户名可以通过jetty-ream.properties文件进行修改
* brokerURL:访问ActiveMQ服务的路径地址。路径结构为:协议名://主机地址:端口号
*/
connectFactory =new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.179.131:61616");
//创建连接对象
connection=connectFactory.createConnection();
//开启连接
connection.start();
/**
* transacted:是否使用事务 可选值为:true|false
* true:使用事务 当设置次变量值。Session.SESSION_TRANSACTED
* false:不适用事务,设置次变量 则acknowledgeMode参数必须设置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动消息确认机制
* Session.CLIENT_ACKNOWLEDGE:客户端确认机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
*/
session=connection.createSession(false, session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
destination=session.createQueue("my-users");
//创建消息生产者
messageConsumer=session.createConsumer(destination);
//接收消息
message=messageConsumer.receive();
//处理消息
ObjectMessage objMessage =(ObjectMessage)message;
Users users=(Users) objMessage.getObject();
System.out.println("从 ActiveMQ 服务中获取的文本信息 "+users);
} catch (Exception e) {
e.printStackTrace();
}finally {
//回收消息发送者资源
if (messageConsumer !=null) {
try {
messageConsumer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (session !=null) {
try {
session.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (connection !=null) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
发布订阅模型
消息生产者相比上述代码只修改一条属性,即创建目的地的方法
//创建目的地,目的地名称即队列的名称。消息的消费者需要通过此名称访问对应的队列
//1. 点对点模式
//destination=session.createQueue("helloworld-destination");
//2. 发布订阅模式
destination=session.createTopic("Demo-topic");
消费者模式需要修改: 1 同上 2 继承Runnable接口, 实现run方法, 并复制两个这样的消费者线程 3 创建测试类,创建三个线程,并启动这三个线程 4 打开生产者的测试类,就可以发现屏幕上打印三个结果了
测试方法
public class TestTopic extends Thread{
public static void main(String[] args) {
ConsumerTopicDemo1 consumerTopicDemo1 = new ConsumerTopicDemo1();
ConsumerTopicDemo2 consumerTopicDemo2 = new ConsumerTopicDemo2();
ConsumerTopicDemo3 consumerTopicDemo3 = new ConsumerTopicDemo3();
Thread t1=new Thread(consumerTopicDemo1);
t1.start();
Thread t2=new Thread(consumerTopicDemo2);
t2.start();
Thread t3=new Thread(consumerTopicDemo3);
t3.start();
}
}
SpringMVC整合ActiveMq
1. Spring整合ActiveMQ-创建生产者
(1) Spring整合ActiveMQ创建消息生产者时需要添加哪些依赖?
<!-- ActiveMQ客户端完整jar包依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
<!-- ActiveMQ和Spring整合配置文件标签处理jar包依赖 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
</dependency>
<!-- Spring-JMS插件相关jar包依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
</dependency>
(2) 如何在Spring中配置消息生产者?写出步骤
- 搭建好SpringMVC的基础环境
- 创建JMS的配置文件
- 在控制器创建JmsTemplate的对象jmsTempate
步骤二代码 applicationContext-jms.xml
<!-- 需要创建一个连接工厂,连接ActiveMQ. ActiveMQConnectionFactory. 需要依赖ActiveMQ提供的amq标签 -->
<!-- amq:connectionFactory 是bean标签的子标签, 会在spring容器中创建一个bean对象. 可以为对象命名.
类似: <bean id="" class="ActiveMQConnectionFactory"></bean> -->
<amq:connectionFactory brokerURL="tcp://192.168.179.131:61616"
userName="admin" password="admin" id="amqConnectionFactory" />
<!-- 配置池化的ConnectionFactory。 为连接ActiveMQ的connectionFactory提供连接池 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
<property name="connectionFactory" ref="amqConnectionFactory"></property>
<property name="maxConnections" value="10"></property>
</bean>
<!-- spring管理JMS相关代码的时候,必须依赖jms标签库. spring-jms提供的标签库. -->
<!-- 定义Spring-JMS中的连接工厂对象 CachingConnectionFactory - spring框架提供的连接工厂对象.
不能真正的访问MOM容器. 类似一个工厂的代理对象. 需要提供一个真实工厂,实现MOM容器的连接访问. -->
<!-- 配置有缓存的ConnectionFactory,session的缓存大小可定制。 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="3"></property>
</bean>
<!-- JmsTemplate配置 -->
<bean id="template" class="org.springframework.jms.core.JmsTemplate">
<!-- 给定连接工厂, 必须是spring创建的连接工厂. -->
<property name="connectionFactory" ref="connectionFactory"></property>
<!-- 可选 - 默认目的地命名 -->
<property name="defaultDestinationName" value="test-spring"></property>
</bean>
步骤三代码
@Service
public class UserServiceImpl implements UserService{
@Autowired
private JmsTemplate jmsTempate;
@Override
public void addUser(final Users users) {
//设置目的地名称
//this.jmsTempate.setDefaultDestinationName("spring-test2");
//发送消息
this.jmsTempate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
Message message = session.createObjectMessage(users);
return message;
}
});
}
}
2. Spring整合ActiveMQ-创建消费者
(1) Spring整合ActiveMQ创建消息消费者时需要添加哪些依赖?
<!-- ActiveMQ客户端完整jar包依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
<!-- ActiveMQ和Spring整合配置文件标签处理jar包依赖 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
</dependency>
<!-- Spring-JMS插件相关jar包依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
</dependency>
(2) 如何在Spring中配置消息消费者?写出步骤 1. 搭建好SpringMVC的基础环境 2. 创建JMS的配置文件 3. 在控制器创建监听器实现MessageListener接口,然后再配置文件中注册( @Component(value=“myListener”))
applicationContext-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- 需要创建一个连接工厂,连接ActiveMQ. ActiveMQConnectionFactory. 需要依赖ActiveMQ提供的amq标签 -->
<!-- amq:connectionFactory 是bean标签的子标签, 会在spring容器中创建一个bean对象.
可以为对象命名. 类似: <bean id="" class="ActiveMQConnectionFactory"></bean>
-->
<amq:connectionFactory brokerURL="tcp://192.168.179.131:61616"
userName="admin" password="admin" id="amqConnectionFactory"/>
<!-- spring管理JMS相关代码的时候,必须依赖jms标签库. spring-jms提供的标签库. -->
<!-- 定义Spring-JMS中的连接工厂对象
CachingConnectionFactory - spring框架提供的连接工厂对象. 不能真正的访问MOM容器.
类似一个工厂的代理对象. 需要提供一个真实工厂,实现MOM容器的连接访问.
-->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="3"></property>
</bean>
<!-- 注册监听器 -->
<!-- 开始注册监听.
需要的参数有:
acknowledge - 消息确认机制
container-type - 容器类型 default|simple
simple:SimpleMessageListenerContainer最简单的消息监听器容器,只能处理固定数量的JMS会话,且不支持事务。
default:DefaultMessageListenerContainer是一个用于异步消息监听器容器 ,且支持事务
destination-type - 目的地类型. 使用队列作为目的地.
connection-factory - 连接工厂, spring-jms使用的连接工厂,必须是spring自主创建的
不能使用三方工具创建的工程. 如: ActiveMQConnectionFactory.
-->
<jms:listener-container acknowledge="auto" container-type="default"
destination-type="queue" connection-factory="connectionFactory" >
<!-- 在监听器容器中注册某监听器对象.
destination - 设置目的地命名
ref - 指定监听器对象
-->
<jms:listener destination="test-spring" ref="myListener"/>
</jms:listener-container>
</beans>
消息监听器
MyListener.class
@Component(value="myListener")
public class MyListener implements MessageListener{
@Autowired
private UserService userService;
@Override
public void onMessage(Message message) {
//处理消息
ObjectMessage objMessage =(ObjectMessage)message;
Users users=null;
try {
users = (Users) objMessage.getObject();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.userService.showUser(users);
}
}
- phalcon-进阶篇1(过滤与清理)
- phalcon-入门篇9(view层基础使用)
- PhalApi视频教程
- 【学术】一文搞懂自编码器及其用途(含代码示例)
- PhalApi-Zip--压缩文件处理类
- PhalApi-Xhprof -- Facebook开源的轻量级PHP性能分析工具
- OpenAI发布8个模拟机器人环境以及一种HER实现,以训练实体机器人模型
- PhalApi-APK--APK文件解包处理
- [喵咪PHP]页面显示空白问题
- 数据库中间件 Sharding-JDBC 源码分析 —— 结果归并
- PhalGo-Request
- PhalApi-Excel
- PhalGo-Viper获取配置
- Dubbo 源码解析 —— 集群容错架构设计
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Kubeflow v0.6.0 部署采坑记录
- Kubeflow Pipeline 部署记录
- Kubeflow Pipeline - 上传一个 Pipeline
- Kubeflow Pipeline - 构建自定义的 Workflow
- R语言入门之变量重编码与重命名
- Kubeflow Pipeline - 构建一个机器学习 Workflow
- Git 如何压缩 commit
- How go build works
- 网状Meta分析之R语言‘gemtc’包实战(3)
- 关于 K8S API Resources: Group 和 Version 该怎么写
- ZooKeeper 的应用场景
- 在 K8S 部署一个 Spark History Server - 篇3
- Go 学习笔记-1
- Tensorflow-gpu 运行在 cpu 母机的问题
- R语言入门之散点图