SpringBoot集成RabbitMQ-三种模式的实现
时间:2022-07-22
本文章向大家介绍SpringBoot集成RabbitMQ-三种模式的实现,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
文章目录
准备环境
使用IDEA创建一个MAVEN的SpringBoot项目。并勾选如下使用的依赖等
一、fanout模式
(一)生产者
1、目录结构
2、application.yml文件
#服务端口
server:
port: 8989
#配置rabbitmq
spring:
rabbitmq:
port: 5672
username: admin
password: 123456
host: 你的rabbitmq的IP或者域名
virtual-host: /
#配置其他 这些是自定义的,方便之后使用
mq:
fannout:
exchangName: order.fanout.ex
3、Producer.java
package com.zh.srp.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;
@Component
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
//1、定义交换机
@Value("${mq.fannout.exchangName}")
private String exchangName;
//2、路由Key
private String routeKey = "";
public void sendMessage(int i){
//订单信息
String orderId = UUID.randomUUID().toString();
//消息
String message = "你的订单第【"+i+"】个信息是:" + orderId + new Date().toString();
System.out.println("正在发送----->:"+message);
rabbitTemplate.convertAndSend(exchangName,routeKey,message);
}
}
4、SRCApplicationTests.java
package com.zh.srp;
import com.zh.srp.mq.Producer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SRCApplicationTests {
@Autowired
private Producer producer;
@Test
void contextLoads() throws InterruptedException {
for (int i = 0; i < 100; i++) {
producer.sendMessage(i);
Thread.sleep(2);
}
}
}
(二)消费者
1、目录结构
2、application.yml文件
#服务端口
server:
port: 8081
#配置rabbitmq
spring:
rabbitmq:
port: 5672
username: admin
password: 123456
host: ip
virtual-host: /
#配置其他
mq:
fannout:
exchangName: order.fanout.ex #交换机
log: #日志队列
queue: order.fanout.log.queue #C1队列
email:
queue: order.fanout.email.queue #C2队列
3、EmailService.java
package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout direct topic
//确定队列queue的持久性
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "${mq.fannout.email.queue}",
autoDelete = "true"),
exchange = @Exchange(value = "${mq.fannout.exchangName}",type = ExchangeTypes.FANOUT)))
public class EmailService {
@RabbitHandler
public void sendMessage(String Message){
System.out.println("Email-log--------->"+Message);
}
}
4、LogService.java
package com.zh.srp.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;
@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout direct topic
//确定队列queue的持久性
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "${mq.fannout.log.queue}",
autoDelete = "true"),
exchange = @Exchange(value = "${mq.fannout.exchangName}",type = ExchangeTypes.FANOUT)))
public class LogService {
@RabbitHandler
public void sendMessage(String Message){
System.out.println("log--------->"+Message);
}
}
(三)fanout模式测试
1、启动消费者的SRApplication.java 2、启动生产者的Test.java中的测试方法
生产:
消费:
二、direct模式
(一)生产者
1、目录结构
2、application.yml文件
#服务端口
server:
port: 8989
#配置rabbitmq
spring:
rabbitmq:
port: 5672
username: admin
password: 123456
host: IP
virtual-host: /
#配置其他
mq:
direct:
exchangName: order.direct.ex
3、Producer.java
package com.zh.srp.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;
@Component
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 这个是一个接口,最终也会找到实现类RabbitTemplate
// @Autowired
// private AmqpTemplate amqpTemplate;
//1、定义交换机
@Value("${mq.direct.exchangName}")
private String exchangName;
// 保存用户
public void saveUser(int i){
//订单信息
String orderId = UUID.randomUUID().toString();
//消息
String message = "保存用户:" + orderId + new Date().toString();
System.out.println("正在发送user----->:"+message);
rabbitTemplate.convertAndSend(exchangName,"email",message);
rabbitTemplate.convertAndSend(exchangName,"log",message);
rabbitTemplate.convertAndSend(exchangName,"wx",message);
}
// 保存用户
public void WX(int i){
//订单信息
String orderId = UUID.randomUUID().toString();
//消息
String message = "你的微信第【"+i+"】个信息是:" + orderId + new Date().toString();
System.out.println("正在发送WX----->:"+message);
rabbitTemplate.convertAndSend(exchangName,"email",message);
rabbitTemplate.convertAndSend(exchangName,"log",message);
}
}
4、DirectApplicationTests.java
package com.zh.srp;
import com.zh.srp.mq.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DirectApplicationTests {
@Autowired
private OrderService producer;
@Test
void contextLoads() throws InterruptedException {
for (int i = 0; i < 1; i++) {
producer.saveUser(i);
producer.WX(i);
Thread.sleep(2);
}
}
}
(二)消费者
1、目录结构
2、application.yml文件
#服务端口
server:
port: 8081
#配置rabbitmq
spring:
rabbitmq:
port: 5672
username: admin
password: 123456
host: IP
virtual-host: /
#配置其他
mq:
direct:
exchangName: order.direct.ex
log: #日志队列
queue: order.direct.log.queue #C1队列
email:
queue: order.direct.email.queue #C2队列
wx:
queue: order.direct.wx.queue #C3队列
3、EMailService.java
package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
public class EMailService {
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout direct topic
//确定队列queue的持久性 autoDelete = "false":代表持久化
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "${mq.direct.email.queue}",
autoDelete = "false"),
exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT),
key = "email" //路由Key
))
@RabbitHandler
public void sendMessage(String Message){
System.out.println("email-log--------->"+Message);
}
}
4、LOGService.java
package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout direct topic
//确定队列queue的持久性
public class LOGService {
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "${mq.direct.log.queue}",
autoDelete = "false"),
exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT),
key = "log" //路由Key
))
@RabbitHandler
public void sendMessage(String Message){
System.out.println("log-log--------->"+Message);
}
}
5、WXService.java
package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout direct topic
//确定队列queue的持久性
public class WXService {
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "${mq.direct.wx.queue}",
autoDelete = "false"),
exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT),
key = "wx" //路由Key
))
@RabbitHandler
public void sendMessage(String Message){
System.out.println("WX-log--------->"+Message);
}
}
(三)测试
1、启动消费者DirectCApplication.java 2、启动生产者的Test类的测试方法
消费
生产
三、topic模式
(一)生产者
1、目录结构
2、application.yml文件
#服务端口
server:
port: 8989
#配置rabbitmq
spring:
rabbitmq:
port: 5672
username: admin
password: 123456
host: ip
virtual-host: /
#配置其他
mq:
topic:
exchangName: linux.topic
3、OrderService.java
package com.zh.srp.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;
@Component
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
//1、定义交换机
@Value("${mq.topic.exchangName}")
private String exchangName;
//WX
public void QQ(int i){
//订单信息
String orderId = UUID.randomUUID().toString();
//消息
String message = "你的WX第【"+i+"】个信息是:" + orderId + new Date().toString();
System.out.println("正在发送WX----->:"+message);
//通配符topic
rabbitTemplate.convertAndSend(exchangName,"qq.log",message);
}
//QQ
public void WX(int i){
//订单信息
String orderId = UUID.randomUUID().toString();
//消息
String message = "你的QQ第【"+i+"】个信息是:" + orderId + new Date().toString();
System.out.println("正在发送QQ----->:"+message);
//通配符topic
rabbitTemplate.convertAndSend(exchangName,"wx.log",message);
}
}
4、TopicApplicationTests.java
package com.zh.srp;
import com.zh.srp.mq.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class TopicApplicationTests {
@Autowired
private OrderService producer;
@Test
void contextLoads() throws InterruptedException {
for (int i = 0; i < 1; i++) {
producer.QQ(i);
producer.WX(i);
Thread.sleep(2);
}
}
}
(二)消费者
1、目录结构
2、application.yml文件
#服务端口
server:
port: 8081
#配置rabbitmq
spring:
rabbitmq:
port: 5672
username: admin
password: 123456
host: ip
virtual-host: /
#配置其他
mq:
topic:
exchangName: linux.topic
qq: #日志队列
queue: linux.topic.qq.queue #C1队列
wx:
queue: linux.topic.wx.queue #C3队列
3、QQService.java
package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
public class QQService {
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "${mq.topic.qq.queue}",
autoDelete = "true"),
exchange = @Exchange(value = "${mq.topic.exchangName}",type = ExchangeTypes.TOPIC),
key = "qq.*" //路由Key
))
@RabbitHandler
public void sendMessage(String Message){
System.out.println("QQ-log--------->"+Message);
}
}
4、WXService.java
package com.zh.srp.mq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
public class WXService {
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "${mq.topic.wx.queue}",
autoDelete = "true"),
exchange = @Exchange(value = "${mq.topic.exchangName}",type = ExchangeTypes.TOPIC),
key = "wx.*" //路由Key
))
@RabbitHandler
public void sendMessage(String Message){
System.out.println("WX-log--------->"+Message);
}
}
(三)测试
1、启动消费者 2、启动生产者
生产者
消费者
- 一句代码实现批量数据绑定[上篇]
- 机器学习-从高频号码中预测出快递送餐与广告骚扰
- MS Windows 下基于Atom的LaTeX编译环境的配置
- WCF中的Binding模型之一: Binding模型简介
- WCF中的Binding模型之一: Binding模型简介
- 2017最火的五篇深度学习论文 总有一篇适合你
- SplashScreenSource的妙用
- SplashScreenSource的妙用
- SplashScreenSource的妙用
- Nodejs学习笔记(十七)--- 浮点运算decimal.js
- AI时代让自己幸福更是一种能力
- 持续不断地推荐儿童不宜视频背后,YouTube是这样训练AI的
- 人工智能会导致大量失业,你怕吗
- 机器人产业链分析-中国机器人产业的发展机遇和挑战
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释