MQ 系列之 ActiveMQ 基本使用
1.1 ActiveMQ 的 API
1.1.1 ConnectionFactory
activemq-client 通过创建 ConnectionFactory 建立到 ActveMQ 的连接,连接工厂封装了一组连接配置参数,这组参数在配置 ActiveMQ 时已经定义,例如 brokerURL 参数,此参数传入的是 ActiveMQ 服务地址和端口,支持 openwire 协议的默认连接为 tcp://localhost:61616
,支持 stomp 协议的默认连接为 tcp://localhost:61613
。ActiveMQConnectionFactory 构造方法:
♞ ActiveMQConnectionFactory();
♞ ActiveMQConnectionFactory(String brokerURL);
♞ ActiveMQConnectionFactory(String userName, String password, String brokerURL);
♞ ActiveMQConnectionFactory(String userName, String password, URI brokerURL);
♞ ActiveMQConnectionFactory(URI brokerURL);
1.1.2 Connection
在成功创建正确的 ConnectionFactory 后,下一步将是创建一个连接,它是 JMS 定义的一个接口。 ConnectionFactory 的静态方法 createConnection()
可以返回与底层消息传递系统进行通信的 Connection 实现。通常客户端只使用单一连接。根据 JMS 文档可知,Connection 的目的是“利用 JMS 提供者封装开放的连接”,以及表示“客户端与提供者服务例程之间的开放 TCP/IP 套接字”。该文档还指出 Connection 应该是进行客户端身份验证的地方,除了其他一些事项外,客户端还可以指定惟一标志符。当一个 Connection 被创建时,它的传输默认是关闭的,必须使用 start 方法开启。一个 Connection 可以建立一个或多个的 Session。当一个程序执行完成后,必须使用 close() 方法关闭之前创建的 Connection,否则 ActiveMQ 不能释放资源,关闭一个 Connection 后同样也关闭了 Session、MessageProducer 和 MessageConsumer。
1.1.3 Session
一旦从 ConnectionFactory 中获得一个 Connection,就必须从 Connection 中创建一个或者多个 Session。Session 是一个发送或接收消息的线程,可以使用 Session 创建 MessageProducer, MessageConsumer 和 Message。Session 可以被事务化,也可以不被事务化,通常,可以通过向 Connection 上的适当创建方法传递一个布尔参数对此进行设置。使用 Session createSession(boolean transacted, int acknowledgeMode);
创建 Session 对象,其中 transacted 为使用事务标识, acknowledgeMode 为签收模式。AUTO_ACKNOWLEDGE = 1
:自动确认(一般选这个);CLIENT_ACKNOWLEDGE = 2
:客户端手动确认; DUPS_OK_ACKNOWLEDGE = 3
:自动批量确认;SESSION_TRANSACTED = 0
:事务提交并确认
1.1.4 Destination
Destination 是一个客户端用来指定生产消息目标和消费消息来源的对象。在 PTP 模式中,Destination 被称作 Queue 即队列;在 Pub/Sub 模式,Destination 被称作 Topic 即主题。在程序中可以使用多个 Queue 和 Topic。ActiveMQSession 中的方法:
♞ Queue createQueue(String queueName);
♞ TemporaryQueue createTemporaryQueue();
♞ Topic createTopic(String topicName);
♞ TemporaryTopic createTemporaryTopic();
1.1.5 Message
JMS 程序的最终目的是生产和消费的消息能被其他程序使用,JMS 的 Message 是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非 JMS 程序格式的消息。Message 由以下几部分组成:消息头,属性和消息体。ActiveMQSession 方法:
♞ BlobMessage createBlobMessage(File file)
♞ BlobMessage createBlobMessage(InputStream in)
♞ BlobMessage createBlobMessage(URL url)
♞ BlobMessage createBlobMessage(URL url, boolean deletedByBroker)
♞ BytesMessage createBytesMessage()
♞ MapMessage createMapMessage()
♞ Message createMessage()
♞ ObjectMessage createObjectMessage()
♞ ObjectMessage createObjectMessage(Serializable object)
♞ TextMessage createTextMessage()
♞ TextMessage createTextMessage(String text)
1.1.6 MessageProducer
MessageProducer 是消息生产者,是用来发送消息的,用来设定消息的优先级,签收的模式,以及消息的存活时间的设定。ActiveMQSession 中的方法:
♞ MessageProducer createProducer(Destination destination);
ActiveMQMessageProducer 中的方法:
♞ void send(Destination destination, Message message);
♞ void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive);
♞ void send(Message message);
♞ void send(Message message, int deliveryMode, int priority, long timeToLive);
其中消息的目的地 Destination
,发送的消息 Message
,消息的模式 DeliveryMode
,消息的优先级 priority
0-9 默认是 4,消息存活时间 timeToLive
存活时间单位是 ms 。
1.1.7 MessageConsumer
MessageConsumer 是消息消费者,是用来接收消息的。ActiveMQSession方法:
♞ MessageConsumer createConsumer(Destination destination);
♞ MessageConsumer createConsumer(Destination destination, String messageSelector);
♞ MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);
♞ TopicSubscriber createDurableSubscriber(Topic topic, String name);
♞ TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal);
其中 messageSelector 为消息选择器;noLocal 标志默认为 false,当设置为 true 时限制消费者只能接收和自己相同的连接(Connection)所发布的消息,此标志只适用于主题,不适用于队列;name 标识订阅主题所对应的订阅名称,持久订阅时需要设置此参数。
☞ 消息的同步和异步接收
消息的同步接收是指客户端主动去接收消息,客户端可以采用 MessageConsumer 的 receive 方法去接收下一个消息。消息的异步接收是指当消息到达时,ActiveMQ 主动通知客户端。客户端可以通过注册一个实现 MessageListener 接口的对象到 MessageConsumer。MessageListener 只有一个必须实现的方法 onMessage,它只接收一个参数,即 Message。在为每个发送到 Destination 的消息实现 onMessage 时,将调用该方法。ActiveMQMessageConsumer方法:
♞ Message receive()
♞ Message receive(long timeout)
♞ Message receiveNoWait()
实现 MessageListener 接口,每当消息到达时,ActiveMQ 会调用 MessageListener 中的 onMessage 函数。
☞ 消息选择器
JMS 提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些额外开销。
消息选择器是用于 MessageConsumer 的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。按照 JMS 文档的说法,消息选择器是一些字符串,它们基于某种语法,而这种语法是 SQL-92 的子集。可以将消息选择器作为 MessageConsumer 创建的一部分。例如:public final String SELECTOR = "JMSType = 'TOPIC_PUBLISHER'";
该选择器检查了传入消息的 JMSType 属性,并确定了这个属性的值是否等于 TOPIC_PUBLISHER。如果相等,则消息被消费;如果不相等,那么消息会被忽略。
1.2 点对点模式
1.2.1 概述
点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向 ActiveMQ 发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在 ActiveMQ 服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上 ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。
1.2.2 消息生产者
☞ 添加依赖
<!-- activemq-client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.14.5</version>
</dependency>
<!-- spring-jms -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.12.RELEASE</version>
</dependency>
☞ 生产消息
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/8/3
* @description ActiveMQ 生产者
*/
public class ActiveMQProducer {
public static void main(String[] args) throws JMSException {
// 创建会话工厂对象 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// 创建连接对象 Connection
Connection connection = connectionFactory.createConnection();
// 开启连接对象
connection.start();
// 获得会话 Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("收到消息了吗?");
// 指定消息发送地址和类型
Destination destination = session.createQueue("test");
// 创建消息发送对象
MessageProducer producer = session.createProducer(destination);
// 实现消息发送
producer.send(textMessage);
// 资源关闭
session.close();
connection.close();
}
}
1.2.3 消息消费者
☞ 同步接收消息
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/8/3
* @description ActiveMQ 消费者
*/
public class ActiveMQProducer {
public static void main(String[] args) throws JMSException {
// 创建 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// 创建连接对象 Connection
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 创建会话对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建需要接收的消息地址
Destination destination = session.createQueue("test");
// 接收消息
MessageConsumer consumer = session.createConsumer(destination);
while (true){
// 等待 10 秒,在 10 秒内一直处于接收消息状态
Message message = consumer.receive(10000);
if(message!=null){
if(message instanceof TextMessage){
// 将消息转换成 TextMessage
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的内容是:" + textMessage.getText());
break;
}
}
}
// 关闭资源
session.close();
connection.close();
}
}
☞ 异步接收消息
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/8/3
* @description ActiveMQ 消费者另外一种监听方式接收消息,该方法需要实现一个接口 MessageListener
*/
public class ActiveMQConsumer {
public static void main(String[] args) throws Exception {
// 创建 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// 创建连接对象 Connection
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 创建会话对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建需要接收的消息地址
Destination destination = session.createQueue("test");
// 接收消息
MessageConsumer consumer = session.createConsumer(destination);
//------------------监听创建开始-------------------
// 监听消息,监听的方式是创建了一个线程
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
// 获取消息
try {
System.out.println("消息内容:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 为了防止主线程结束,这里录入键盘事件后再继续执行
System.in.read();
//------------------监听创建结束-------------------
// 关闭资源
session.close();
connection.close();
}
}
1.3 发布/订阅模式
1.3.1 概述
发布/订阅消息模式是消息发送者发送消息到主题(topic),而多个消息接收者监听这个主题;其中,消息发送者和接收者分别叫做发布者(publisher)和订阅者(subscriber),对于发布者来说,它和所有的订阅者就构成了一个一对多的关系。消息生产者将消息(发布)到 topic 中,可以同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费;当生产者发布消息时,不管是否有消费者,都不会保存消息;一定要先有消息的消费者,后有消息的生产者。
1.3.2 订阅者
☞ 同步接收消息
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/8/3
* @description 订阅者
*/
public class ActiveMQSubscriber02 {
public static void main(String[] args) throws Exception {
// 创建 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// 创建连接对象 Connection
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 创建会话对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建需要接收的消息地址
Topic topic = session.createTopic("topic");
// 接收消息
MessageConsumer consumer = session.createConsumer(test_topic);
while (true){
// 等待 10 秒,在 10 秒内一直处于接收消息状态
Message message = consumer.receive(10000);
if(message!=null){
if(message instanceof TextMessage){
// 将消息转换成 TextMessage
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的内容是:" + textMessage.getText());
break;
}
}
}
// 关闭资源
session.close();
connection.close();
}
}
☞ 异步接收消息
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/8/3
* @description 订阅者另一种订阅消息方式
*/
public class ActiveMQSubscriber01 {
public static void main(String[] args) throws Exception {
// 创建 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// 创建连接对象 Connection
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 创建会话对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建需要接收的消息地址
Topic test_topic = session.createTopic("topic");
// 接收消息
MessageConsumer consumer = session.createConsumer(test_topic);
//-------------关注这里开始----------------
// 监听的方式是创建了一个线程
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
// 获取消息
try {
System.out.println("消息是:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 为了防止主线程结束,这里录入键盘事件后再继续执行
System.in.read();
//-------------关注这里结束----------------
// 关闭资源
session.close();
connection.close();
}
}
1.3.3 发布者
/**
* Created with IntelliJ IDEA.
*
* @author gaohu9712@163.com
* @date 2020/8/3
* @description 发布者
*/
public class ActiveMQPublisher {
public static void main(String[] args) throws JMSException {
// 创建会话工厂对象 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// 创建连接对象 Connection
Connection connection = connectionFactory.createConnection();
// 开启连接对象
connection.start();
// 获得会话 Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("发布消息");
// 指定消息发送地址和类型
Topic topic = session.createTopic("topic");
// 创建消息发送对象
MessageProducer producer = session.createProducer(topic);
// 实现消息发送
producer.send(textMessage);
// 资源关闭
session.close();
connection.close();
}
}
- AtCoder Beginner Contest 069【A,水,B,水,C,数学,D,暴力】
- 2017"百度之星"程序设计大赛 - 资格赛【1001 Floyd求最小环 1002 歪解(并查集),1003 完全背包 1004 01背包 1005 打表找规律+卡特兰数】
- 洛谷 2634&&BZOJ 2152: 聪聪可可【点分治学习+超详细注释】
- 【经验总结】Java在ACM算法竞赛编程中易错点
- 【Java学习笔记之六】java三种循环(for,while,do......while)的使用方法及区别
- 类A是公共的,应在名为A.java的文件中声明错误
- 逆天通用水印支持Winform,WPF,Web,WP,Win10。支持位置选择(9个位置 ==》[X])
- 【Java学习笔记之七】java函数的语法规则总结
- BZOJ 3038: 上帝造题的七分钟2【线段树区间开方问题】
- BZOJ 3211: 花神游历各国【线段树区间开方问题】
- WP、Win10开发或者WPF开发时绘制自定义窗体~例如:一个手机
- 【Java学习笔记之八】JavaBean中布尔类型使用注意事项
- BZOJ 1597: [Usaco2008 Mar]土地购买【斜率优化+凸包维护】
- BZOJ 1046: [HAOI2007]上升序列【贪心+二分状态+dp+递归】
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Clickhouse分布式集群搭建
- Redis过期策略以及淘汰机制
- 几行代码就可以轻松给你的程序加上进度条
- git禁止在master分支push和commit
- 记录一次mybatis缓存和事务传播行为导致ut挂的排查过程
- appium教程_3.启动appium-server
- appium教程_4.adb常用命令
- Python中的高阶概念属性:五个你应该搞明白的知识点
- 一次奇怪的http状态码改变
- Salesforce LWC学习(二十七) File Upload
- 让我们来谈谈python中的prettyprint和pprint
- vue 开发规范
- Markdown 编写规范
- JavaScript编码规范
- HTML编码规范