RabbitMQ之Pub/Sub模式 (订阅模式)
时间:2021-10-03
本文章向大家介绍RabbitMQ之Pub/Sub模式 (订阅模式),主要包括RabbitMQ之Pub/Sub模式 (订阅模式)使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
概念
在前面的模式中 一个消息都只能被一个消费者使用
但是在阅读模式一个消息可以被多个消费者使用
简单例子
交换机创建参数:
编写生产者 创建一个交换机和两个队列 并做好关系绑定
public class PubProducer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.198.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//创建连接
connection = connectionFactory.newConnection("生产者");
//获取通道
channel = connection.createChannel();
//创建交换机以及两个队列 同时绑定关系
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,false,false,false,null);
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,false,false,false,null);
channel.queueDeclare(queue2Name,false,false,false,null);
//绑定关系 第三个参数为routingKey 绑定规则 fanout使用""
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
finally {
//关闭通道
if(channel != null && channel.isOpen()){
try {
channel.close();
}
catch (Exception e){
e.printStackTrace();
}
}
if(connection != null && connection.isOpen()){
try {
connection.close();
}
catch (Exception e){
e.printStackTrace();
}
}
}
}
}
运行程序后在web页面查看交换机绑定关系
消费者编写和之前没区别 给好队列名字即可
public class PubConsumer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.198.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//创建连接
connection = connectionFactory.newConnection("消费者");
//获取通道
channel = connection.createChannel();
//通过通道声明队列,创建交换机等一系列事情
channel.basicConsume("test_fanout_queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("1号消费者接受到的消息为 " + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("收取消息失败");
}
});
//卡一下
System.out.println("键盘输入关闭消费者");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
finally {
//关闭通道
if(channel != null && channel.isOpen()){
try {
channel.close();
}
catch (Exception e){
e.printStackTrace();
}
}
if(connection != null && connection.isOpen()){
try {
connection.close();
}
catch (Exception e){
e.printStackTrace();
}
}
}
}
}
另外一个消费者代码改为队列2即可
运行测试:
先运行两个消费者等待生产者消息
运行生产者后虽然只有一条消息但是可以看到两个消费者都拿到消息了
原文地址:https://www.cnblogs.com/OfflineBoy/p/15365211.html
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法