rabbitmq发布订阅
一、发布订阅模式
还记得我们上一个文章是如何发布消息的吗?
回顾一下以前是如何发送消息的:
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
对的,以前我们发送消息是直接由生产者将消息发送到队列,可是这种方式官方是不推荐的!
RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。
相反,生产者只能将消息发送到交换机。交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交易所必须确切知道如何处理收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?还是应该丢弃它。规则由交换类型定义 。
你可以将交换机想象成一个分发器更好容易理解,消息生产者你可以理解为皇帝,他所下发的命令都由圣旨传递,皇帝当然不可能亲自去送圣旨,所以这个工作由太监来承担,这里的太监就是交换机,由太监根据圣旨类型送到文武百官手里,这里文武百官也就是消费者。大概看一下流程图:
img
其中 X 就是交换机 交换机类型大概有:
- direct:
直连交换机
根据RouteKey转发到队列 - 任何发送到Direct Exchange的消息都会被转发到指定RouteKey中指定的队列Queue;
- 生产者生产消息的时候需要执行Routing Key路由键;
- 队列绑定交换机的时候需要指定Binding Key,只有路由键与绑定键相同的话,才能将消息发送到绑定这个队列的消费者;
- 如果vhost中不存在RouteKey中指定的队列名,则该消息会被丢弃;
- topic:
通配符交换机
,满足Route Key与Binding Key模糊匹配 - 任何发送到Topic Exchange的消息都会被转发到所有满足Route Key与Binding Key模糊匹配的队列Queue上;
- 生产者发送消息的时候需要指定Route Key,同时绑定Exchange与Queue的时候也需要指定Binding Key;
- #” 表示0个或多个关键字,“*”表示匹配一个关键字;
- 如果Exchange没有发现能够与RouteKey模糊匹配的队列Queue,则会抛弃此消息;
- 如果Binding中的Routing key *,#都没有,则路由键跟绑定键相等的时候才转发消息,类似Direct Exchange;如果Binding中的Routing key为#或者#.#,则全部转发,类似Fanout Exchange;
- fanout:
广播式交换机
,所有发送到Fanout Exchange交换机上的消息,都会被发送到绑定到该交换机上面的所有队列上,这样绑定到这些队列的消费者就可以接收到该消息。
header模式在实际使用中较少,本文只对前三种模式进行比较。 性能排序:fanout >> direct >> topic。比例大约为11:10:6
我们本章专题会着重介绍fanout
类型的交换机!
生产者指定通道交换机类型
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
生产者不需要创建队列,只需要创建交换机,并且指明该生产者对应的交换机即可,队列的创建由消费者创建,所以发送消息的时候
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
消费者需要创建队列,并且绑定到交换机
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定给交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
完整代码
生产者代码
package com.ps;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.util.MqConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huangfu
* 队列 消息生产者
* 发布 订阅模式
*/
public class PSProducer {
private static String EXCHANGE_NAME = "ps";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = MqConnection.getConnection();
Channel channel = connection.createChannel();
/**
* 声明交换机
* fanout 不处理路由,分发给所有队列
* direct 处理路由 发送的时候需要发sing一个路由key
*/
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String msg = "醉卧沙场君莫笑";
/**
* 第二个参数
* 匿名转发,路由key
*/
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
channel.close();
connection.close();
}
}
消费者1
package com.ps;
import com.rabbitmq.client.*;
import com.util.MqConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Administrator
*/
public class PsCoummer {
private static final String QUEUE_NAME = "ps";
private static final String EXCHANGE_NAME = "ps";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = MqConnection.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 告诉消费者每次只发一个给消费者
* 必须消费者发送确认消息之后我才会发送下一条
*/
channel.basicQos(1);
//绑定给交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body,"UTF-8"));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
//发送回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
/**
* 第二个参数
* true:自动确认
* 一旦mq将消息分发给消费者 就会从内存中删除,会出现消息丢失
* false:手动确认(默认)
* 如果消费者挂掉,我将此消息发送给其他消费者
* 支持消息应答,当消费者处理完成后发送给生产者回执,删除消息
*
*
* 当消息队列宕了 内存里的数据依旧会丢失,此时需要将数据持久化
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
消费者2
package com.ps;
import com.rabbitmq.client.*;
import com.util.MqConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Administrator
*/
public class PsCoummer2 {
private static final String QUEUE_NAME = "ps2";
private static final String EXCHANGE_NAME = "ps";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = MqConnection.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 告诉消费者每次只发一个给消费者
* 必须消费者发送确认消息之后我才会发送下一条
*/
channel.basicQos(1);
//绑定给交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body,"UTF-8"));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
//发送回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
/**
* 第二个参数
* true:自动确认
* 一旦mq将消息分发给消费者 就会从内存中删除,会出现消息丢失
* false:手动确认(默认)
* 如果消费者挂掉,我将此消息发送给其他消费者
* 支持消息应答,当消费者处理完成后发送给生产者回执,删除消息
*
*
* 当消息队列宕了 内存里的数据依旧会丢失,此时需要将数据持久化
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
完成流程图
img
二、临时队列
我们创建队列的方式一般是这样:channel.queueDeclare(QUEUE_NAME,true,false,false,null);
,但是当我们不对全部的消息都感兴趣,而只对一部分消息感兴趣的情况下,获取你应该了解一个概念:临时队列
为了实现这个概念,我们应该去了解两件事来实现这个临时队列
- 无论什么时候我们连接队列的时候都需要一个新的队列!所以我们应该创建一个有随机名称的队列!
- 一旦断开连接,队列将自动删除!
当然,rabbitmq的客户端已经为我们实现这个,纳闷创建一个临时队列应该怎么来做呢?
String queueName = channel.queueDeclare().getQueue();
- 这么创建,他会创建一个临时队列,并且返回队列的名字!
- 在Java客户端中,当我们不向queueDeclare()提供任何参数时,我们将 使用生成的名称创建一个非持久的,排他的,自动删除的队列
- 通过执行计划中的CONCATENATION分析sql问题(r4笔记第16天)
- 《小美好》短评文本情感分析+生成词云
- 通过shell定制dbms_advisor.quick_tune(r4笔记第15天)
- 跨浏览器tab页的通信解决方案尝试
- 深度学习的GPU:深度学习中使用GPU的经验和建议
- socket.io搭配pm2(cluster)集群解决方案
- 用 Python 来刷微信「跳一跳」游戏的记录
- Spring+SpringMVC+MyBatis+easyUI整合优化篇(十二)数据层优化-explain关键字及慢sql优化
- 高吞吐koa日志中间件
- 关于SQLRecoverableException问题的排查和分析(r4笔记第13天)
- Spring+SpringMVC+MyBatis+easyUI整合优化篇(十三)数据层优化-表规范、索引优化
- node中的Stream-Readable和Writeable解读
- Spring+SpringMVC+MyBatis+easyUI整合进阶篇(六)一定要RESTful吗?
- 深入node之Transform
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- python第四十九课——对象序列化与反序列化
- python第五十课——多态性
- python第五十一课——__slots
- Linux系统安全配置iptables服务介绍
- ThreadLocal企业中真实应用
- python第五十二课--自定义异常类
- python第五十三课——time模块
- 从亲身经历谈谈如何用Git分支解决项目生产实践中的痛点
- mysql数据库基础命令(一)
- Linux系统Logrotate服务介绍
- python五十四课——datetime模块
- python五十五课——calendar模块
- python五十六课——正则表达式(常用函数之match)
- python五十六课——正则表达式(常用函数之search())
- python五十六课——正则表达式(常用函数之findall)