RabbitMQ入门-Topic模式
时间:2022-04-22
本文章向大家介绍RabbitMQ入门-Topic模式,主要内容包括Topic模式、发送端、运行情况、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。
上篇《RabbitMQ入门-Routing直连模式》我们介绍了可以定向发送消息,并可以根据自定义规则派发消息。看起来,这个Routing模式已经算灵活的了,但是,这还不够,我们还有更加多样灵活的Topic模式。
Topic模式
- 模型组成相较前几种没有什么变化,一个生产者P,一个交换机X,多个消息队列Q以及多个消费者C
- 在Exchange派发消息到消息队列Queue所用的规则不同,我们看到了有符号"*"以及"#",可以认为是通配符
- "*"用于匹配一个单词,比如"a","abc"等;"#"用于匹配0个或者多个单词,比如"", "abc", "abc.def"等
发送端
/**
* Created by jackie on 17/8/7.
*/
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.3.161");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close();
connection.close();
}
private static String getSeverity(String[] strings){
if (strings.length < 1)
return "info";
return strings[0];
}
private static String getMessage(String[] strings){
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0 ) return "";
if (length < startIndex ) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);这里指定的Exchagne模式为Topic模式
- 通过String routingKey = getRouting(argv);实现在Program arguments中填写routing key参数
- 通过String message = getMessage(argv);实现在Program arguments中填写发送的消息
这时候我们给Program argument赋值如下,并启动发送端程序
程序运行完,可以在RabbitMQ管理应用中看到名为“topic_logs”的Exchange。
接收端
/**
* Created by jackie on 17/8/7.
*/
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.3.161");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
- 和Routing模式异曲同工,声明与发送端一样的Exchange名称
- 通过Program arguments得到的routing key的输入参数,并将其与Exchange绑定,这时候就可以使用灵活的通配符了
运行情况
我们将启动两个消费者,并分别制定两套Routing key的规则。 第一个消费者
第二个消费者
启动两个消费者后,使用发送端发送一条消息,我们可以发现两个消费者都通过Routing key规则派发到了消息
注意:实际上如果Routing key写成了“#”表示能够接受所有的消息,类似广播模式。 这就是Topic模式,到此为止,几大主要RabbitMQ模式已经讲完了。你是否对于RabbitMQ有了一个基本的了解了?
- HTML中的javascript交互
- Spring Cloud构建微服务架构:分布式配置中心【Dalston版】
- [Android] Toast问题深度剖析(二)
- [Android] Toast问题深度剖析(一)
- android离线缓存技术
- 浅谈ViewModel
- Android深入理解JNI(二)类型转换、方法签名和JNIEnv
- 探讨通过Feign配合Hystrix进行调用时异常的处理
- 小窗播放视频的原理和实现(上)
- 一种Android App在Native层动态加载so库的方案
- java的双缓冲技术
- application之OnLowMemory()和 OnTrimMemory(level)讲解
- React Native组件(一)组件的生命周期
- Spring Cloud构建微服务架构:服务消费(基础)【Dalston版】
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 国标GB28181协议客户端EasyGBS国标视频平台级联EasyNVR:EasyGBS如何实现调阅EasyNVR的视频通道?
- gitlab CI/CD 相关问题
- 微信jssdk分享接口
- 有赞 Flutter 混编方案
- 最近开发问题
- echarts相关问题总结
- 视频上云EasyNTS组网硬件设备登录后自动下线并清除设备信息是什么原因?
- 震惊! 再也不怕蹭网被发现了!
- 国标GB28181协议客户端EasyGBS国标视频平台级联EasyNVR:EasyNVR到EasyGBS上是如何注册及注销的?
- git全局忽略文件配置
- 2018-10-16近期vue开发总结
- 有赞埋点实践
- 树莓派配置LEDE(openwrt衍生版)拨号
- Android 沉浸式解析和轮子使用
- 腾讯云CDN使用(接入方式:COS源)