RabbitMQ入门教程
MQ?
MQ(Message Quene):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为消息中间件通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
主要用途:不同进程Process/线程Thread之间通信。
RabbitMQ
你可以注意到一句非常“狂”的话,RabbitMQ is the most widely deployed open source message broker.
确实是这样哈,目前市面上用的最多的就是RabbitMQ。
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office and a postman. The major difference between RabbitMQ and the post office is that it doesn't deal with paper, instead it accepts, stores and forwards binary blobs of data ‒ messages.
RabbitMQ是一个消息代理(也称中间件):它接受和转发消息。你可以把它想象成邮局:当你把要邮寄的邮件放在邮筒里时,你可以确定送信先生或女士最终会将邮件发送给你的收件人。在这个类比中,RabbitMQ是一个邮政信箱,一个邮局和一个邮递员。
区别在于:RabbitMQ不会帮你处理里面的内容(官方原话的纸张是为了让读者更好的理解),而是帮你接受,存储和转发。
安装RabbitMQ
这里我推荐大家开启虚拟机,然后使用docker来安装RabbitMQ,不要用Windows版本。
docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
启动和停止
docker start rabbitmq
docker stop rabbitmq
这里为什么有两个端口呢
15672:WEB界面的端口,启动RabbitMQ后,使用ip+15672
就可以访问了。
5672:通信端口(比如使用JAVA连接肯定是使用这个端口啦)
访问:你的ip:15672
,如果你是本机,localhost:15672
,如果你在虚拟机(Linux)不知道ip,输入命令ifconfig
即可查看
默认的管理员用户密码都为guest
JAVA(Hello World)
In this part of the tutorial we'll write two programs in Java; a producer that sends a single message, and a consumer that receives messages and prints them out. We'll gloss over some of the detail in the Java API, concentrating on this very simple thing just to get started. It's a "Hello World" of messaging. In the diagram below, "P" is our producer and "C" is our consumer. The box in the middle is a queue - a message buffer that RabbitMQ keeps on behalf of the consumer.
我们将用Java编写两个程序
发送单个消息的producer(生产者)和接收消息并打印出消息的consumer(消费者)。官方文档说会掩盖JavaAPI的一些细节,专注于这个非常简单的事情,以便开始。这是一个"Hello World"的消息。但我是一个细节的人,所以我会处理得比官方细节。
在上图中,"P"是我们的生产者,"C"是我们的消费者。中间的框是队列-RabbitMQ代表消费者保留的消息缓冲区。
设计生产者
我们将新建一个Maven项目
引入依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
</dependencies>
创建生产者类,name->producer,注意,factory.setHost
填写的是你安装的IP地址
package com.xn2001;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 乐心湖
* @date 2020/5/31 0:26
**/
public class producer {
//命名队列
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建到服务器的工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.123.128");
//创建连接
Connection connection = factory.newConnection();
//创建一个通道,这是大多数API用于完成工作的位置。
Channel channel = connection.createChannel();
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "乐心湖好帅";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
运行程序,可以看到这个消息队列已经发送了过来。
这个1代表有一个消息还没给消费者接收到。我们点进去hello,
设计消费者
新建一个类,name->consumer
package com.xn2001;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @author 乐心湖
* @date 2020/5/31 0:51
**/
public class consumer{
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//由于它将异步推送我们的消息,这里我们以对象的形式提供回调,该对象将缓冲消息,直到我们使用它们。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [√] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
官方这里有一段话
Why don't we use a try-with-resource statement to automatically close the channel and the connection? By doing so we would simply make the program move on, close everything, and exit! This would be awkward because we want the process to stay alive while the consumer is listening asynchronously for messages to arrive.
这里我们为什么不尝试使用关闭通道和连接呢,如果这样做,这个程序就会运行一遍就过去了,不能处在活跃状态,那如何接受消息呢。
换句话说,我们必须让通道和连接保持活跃,这样就能时刻监听到消息。
我们启动consumer
,就可以收到消息了。
我们将封装一个工具类,RabbitMQUtil
package com.xn2001.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author 乐心湖
* @date 2020/5/31 13:01
**/
public class RabbitMQUtil {
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.111.129");
}
//定义连接对象的方法
public static Connection getConnection() {
try {
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//定义一个关闭通道和连接的方法
public static void closeChannelAndConnection(Channel channel, Connection connection) {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
把生产者和消费者重新写一下
package com.xn2001;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.xn2001.util.RabbitMQUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 乐心湖
* @date 2020/5/31 0:26
**/
public class producer {
//命名队列
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
//创建到服务器的工厂
//ConnectionFactory factory = new ConnectionFactory();
//factory.setHost("192.168.111.129");
//创建连接
//Connection connection = factory.newConnection();
//创建一个通道,这是大多数API用于完成工作的位置。
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "乐心湖好帅";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [√] Sent '" + message + "'");
RabbitMQUtil.closeChannelAndConnection(channel,connection);
}
}
package com.xn2001;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.xn2001.util.RabbitMQUtil;
/**
* @author 乐心湖
* @date 2020/5/31 0:51
**/
public class comsumr {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//ConnectionFactory factory = new ConnectionFactory();
//factory.setHost("192.168.111.129");
//Connection connection = factory.newConnection();
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [√] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
解释说明
package com.xn2001;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 乐心湖
* @date 2020/6/1 15:52
**/
public class Test {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("192.168.111.111");
//若你的端口不是默认的5672,就需要设置,否则不写也可。
factory.setPort(5672);
//若用户不是使用guest,请往下看
//设置连接到哪一个虚拟机
factory.setVirtualHost("/ems");
//设置访问该虚拟机的用户和密码
factory.setUsername("ems");
factory.setPassword("123456");
//获取连接对象
Connection connection = factory.newConnection();
//创建通道连接
Channel channel = connection.createChannel();
/**
* 通道绑定消息队列
* @param queue 消息队列的名称,不存在时自动创建
* @param durable 队列是否持久化,持久化后当重新启动rabbitmq时队列依旧存在
* @param exclusive 是否独占队列,只允许一个在用,一般设置为false
* @param autoDelete 消费完消息后自动删除这个消息队列
* @param arguments
*/
channel.queueDeclare("hello",false,false,false,null);
/**
* 发布消息
* @param exchange 交换机名称
* @param routingKey 消息队列名称
* @param props 传递消息的额外设置,例如消息需要持久化,可以设置为MessageProperties.PERSISTENT_TEXT_PLAIN
* @param body 消息具体内容
*/
channel.basicPublish("","hello", null,"hello rabbitmq".getBytes());
//关闭
channel.close();
connection.close();
}
}
注意:上面我仅展示出了生产者这些常用的设置,消费者理论上几乎是一样的。
同时你需要关注的是,生产者和消费者的消息队列参数必须一致,这样才能匹配得上。
- React项目配置5(引入MockJs,实现假接口开发)
- POj 1611 The Suspects
- React项目配置4(如何在开发时跨域获取api请求)
- Laravel-博客实战+踩坑laravel-blog最终的效果踩的坑
- React项目配置3(如何管理项目API接口)
- React第三方组件3(状态管理之Flux的使用④TodoList下)
- React第三方组件3(状态管理之Flux的使用③TodoList中)
- Vue实现百度下拉提示搜索一、前期准备二、代码实现三、实现效果
- React第三方组件3(状态管理之Flux的使用②TodoList上)
- ggplot2玫瑰图案例——星巴克门店分布图
- React第三方组件3(状态管理之Flux的使用①简单使用)
- 用ggplot2画了一个我也叫不上名的炫酷图表
- React技巧8(不再手动绑定this,跟.bind(this)说88)
- 美美的商务范儿——ggplot2蝴蝶图
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Servlet执行流程
- 最新的spring boot技术实现登录、列表、分页、上传等功能
- 谈谈我对Vue钩子函数、生命周期的理解
- 浅谈Apache Shiro权限模块及数据库设计
- 企图变秃变强的第一天
- 企图变秃变强的第二天
- Lambda表达式和FastDfs文件上传
- Postman带token测试接口、找不到生产者、无法连接MySQL、禅道部署
- MyBatis-Plus调试配置,IllegaStateException,StringUtils补充
- NullException、Token的作用、Mapstruct用法
- 1.String类型字符串拼接2.IDEA清除缓存、热部署3.File的用法 4.Dubbo查询服务状态
- 1.Dubbo 常见错误及解决方法
- SpringBoot学习一:创建工程、pom文件
- SpringBoot学习二:基础配置
- Spring Boot 学习三:静态资源、整合 Thymeleaf 页面模板、@RestControllerAdvice