RabbitMQ入门-Routing直连模式
Hello World模式,告诉我们如何一对一发送和接收消息;
Work模式,告诉我们如何多管齐下高效的消费消息; Publish/Subscribe模式,告诉我们如何广播消息 那么有没有灵活强一点的既可以高效消费,又可以同时送达多个消费者的模式? 有,这就是Routing模式,我又称之为Direct直连模式。
Routing模式
- 一个生产者P,一个交换机X,多个消息队列Q以及多个消费者C
- 在Exchange和Queue中,我们看到了不同的规则,也就是Routing Key
显然从图中的说明,我们就知道这是一个log日志根据级别派发消息的例子。熟悉Log日志系统的应该都知道,一般的log系统分为error、info、warn和debug等。从图中我们可以看出,将日志级别为error的定向的派发到第一个消息队列,将error、warn和info级别的日志派发到第一个消息队列。
该模型首先实现了定向派发,而不再是订阅模式那种广播式的派发。同一条消息既可以派发给一个Queue,也可以同时派发给两个或者多个Queue,这就是该模式的灵活之处。下面来看看实例
发送端
/**
* Created by jackie on 17/8/7.
*/
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.3.161");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close();
connection.close();
}
private static String getSeverity(String[] strings){
if (strings.length < 1)
return "info";
return strings[0];
}
private static String getMessage(String[] strings){
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0 ) return "";
if (length < startIndex ) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
- String severity = getSeverity(argv);通过程序参数赋值给Routing Key,作为发送消息的规则
- String message = getMessage(argv);通过程序参数赋值作为消息实体发送到Queue
在run configurations中配置argv
*第一个参数是要绑定key的名称,第二个参数是要发送的消息内容
- 运行后,可以在RabbitMQ管理应用中看到exchange,但是此时没有绑定queue,所以即使发送消息也没有queue会存储或者消费。
接收端
/**
* Created by jackie on 17/8/7.
*/
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.3.161");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);表示使用的exchange类型为Direct类型
- 绑定的queue的名称也是通过program arguments指定的
这里两个参数info和error表示绑定了两个routing key,即如果发送routing key为info的消息该队列能接收到,如果发送routing key为error,该队列也能收到
运行情况
启动接收端代码,我们可以看到生成了Queue名称为amq.gen-ugjKo6t4y0PXPwoh3CeubA的队列,同时有routingKey=info和routingKey=error的绑定到了Exchange上。
这时候起送发送端给routingkey为info发送消息“hello world”,我们可以看到在接收端确实能够收到消息“hello world”,同理,这时候发送routingkey为error的消息,该队列同样能够接收到,因为队列同时绑定了两个routing key
这个就是Routing直连模式。
如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!如果您想持续关注我的文章,请扫描二维码,关注JackieZheng的微信公众号,我会将我的文章推送给您,并和您一起分享我日常阅读过的优质文章。
- 关于delete,drop,truncate的问题 (r6笔记第14天)
- R语言进行分析,比较详细的一篇,亲测过哦
- DeepMind 开源基于 MuJoCo 物理引擎强化学习工具 Control Suite
- hadoop常用的基本命令,HIVE复制文件,修改文件名
- gc服务器慢的原因分析 (r6笔记第14天)
- AI 玩微信跳一跳的正确姿势——跳一跳 Auto-Jump 算法详解
- 干货 | 深入理解Python装饰器
- 11g dataguard使用总结(r5笔记第12天)
- centos7.4下配置django+uwsgi+nginx
- 11g rac配置scan ip(r6笔记第30天)
- 【C++概念】---vector用法
- 浅谈Orabbix监控指标(r6笔记第27天)
- Orabbix定制监控Oracle的简单配置(r6笔记第26天)
- 【java基础】 线程实例
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Python人工智能经典算法之机器学习第二篇
- Python人工智能经典算法之机器学习第三篇
- Python人工智能经典算法之K-近邻算法
- Python人工智能经典算法之线性回归
- Python人工智能经典算法之逻辑回归
- Python人工智能经典算法之决策树
- Python人工智能经典算法之聚类算法
- 【NPM库】- 0x06 - WebSocket
- 代码详解——《无人驾驶车辆模型预测控制》3.3.3代码详解
- 【前端】:模块化 - 打包技术
- 代码详解——《无人驾驶车辆模型预测控制》3.3.3参考路径更改
- MySQL redo与undo日志解析
- 样本不平衡造成的影响和解决方案
- 代码详解——NMPC路径跟踪复杂参考路径设置
- 代码详解——NMPC之加入控制平顺性惩罚项