SpringBoot集成RabbitMQ-三种模式的实现

时间:2022-07-22
本文章向大家介绍SpringBoot集成RabbitMQ-三种模式的实现,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

文章目录

准备环境

使用IDEA创建一个MAVEN的SpringBoot项目。并勾选如下使用的依赖等

一、fanout模式

(一)生产者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8989

#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: 你的rabbitmq的IP或者域名
    virtual-host: /

#配置其他 这些是自定义的,方便之后使用
mq:
  fannout:
    exchangName: order.fanout.ex 

3、Producer.java

package com.zh.srp.mq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;

@Component
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //1、定义交换机
    @Value("${mq.fannout.exchangName}")
    private String exchangName;
    //2、路由Key
    private String routeKey = "";

    public void sendMessage(int i){
        //订单信息
        String orderId = UUID.randomUUID().toString();
        //消息
        String message = "你的订单第【"+i+"】个信息是:" + orderId + new Date().toString();
        System.out.println("正在发送----->:"+message);
        rabbitTemplate.convertAndSend(exchangName,routeKey,message);
    }
}

4、SRCApplicationTests.java

package com.zh.srp;

import com.zh.srp.mq.Producer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SRCApplicationTests {

    @Autowired
    private Producer producer;

    @Test
    void contextLoads() throws InterruptedException {

        for (int i = 0; i < 100; i++) {
            producer.sendMessage(i);
            Thread.sleep(2);
        }

    }

}

(二)消费者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8081

#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: ip
    virtual-host: /

#配置其他
mq:
  fannout:
    exchangName: order.fanout.ex #交换机
    log: #日志队列
      queue: order.fanout.log.queue #C1队列
    email:
      queue: order.fanout.email.queue #C2队列

3、EmailService.java

package com.zh.srp.mq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性
@RabbitListener(
        bindings = @QueueBinding(value = @Queue(value = "${mq.fannout.email.queue}",
        autoDelete = "true"),
        exchange = @Exchange(value = "${mq.fannout.exchangName}",type = ExchangeTypes.FANOUT)))
public class EmailService {



    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("Email-log--------->"+Message);
    }




}

4、LogService.java

package com.zh.srp.mq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;

@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性
@RabbitListener(
        bindings = @QueueBinding(value = @Queue(value = "${mq.fannout.log.queue}",
        autoDelete = "true"),
        exchange = @Exchange(value = "${mq.fannout.exchangName}",type = ExchangeTypes.FANOUT)))
public class LogService {



    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("log--------->"+Message);
    }
}

(三)fanout模式测试

1、启动消费者的SRApplication.java 2、启动生产者的Test.java中的测试方法

生产:

消费:

二、direct模式

(一)生产者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8989

#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: IP
    virtual-host: /

#配置其他
mq:
  direct:
    exchangName: order.direct.ex

3、Producer.java

package com.zh.srp.mq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;

@Component
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

//    这个是一个接口,最终也会找到实现类RabbitTemplate
//    @Autowired
//    private AmqpTemplate amqpTemplate;

    //1、定义交换机
    @Value("${mq.direct.exchangName}")
    private String exchangName;

//    保存用户
public void saveUser(int i){
    //订单信息
    String orderId = UUID.randomUUID().toString();
    //消息
    String message = "保存用户:" + orderId + new Date().toString();
    System.out.println("正在发送user----->:"+message);
    rabbitTemplate.convertAndSend(exchangName,"email",message);
    rabbitTemplate.convertAndSend(exchangName,"log",message);
    rabbitTemplate.convertAndSend(exchangName,"wx",message);
}
//    保存用户
public void WX(int i){
    //订单信息
    String orderId = UUID.randomUUID().toString();
    //消息
    String message = "你的微信第【"+i+"】个信息是:" + orderId + new Date().toString();
    System.out.println("正在发送WX----->:"+message);
    rabbitTemplate.convertAndSend(exchangName,"email",message);
    rabbitTemplate.convertAndSend(exchangName,"log",message);
}

}

4、DirectApplicationTests.java

package com.zh.srp;

import com.zh.srp.mq.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class DirectApplicationTests {

    @Autowired
    private OrderService producer;

    @Test
    void contextLoads() throws InterruptedException {

        for (int i = 0; i < 1; i++) {
            producer.saveUser(i);
            producer.WX(i);
            Thread.sleep(2);
        }

    }

}

(二)消费者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8081

#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: IP
    virtual-host: /

#配置其他
mq:
  direct:
    exchangName: order.direct.ex
    log: #日志队列
      queue: order.direct.log.queue #C1队列
    email:
      queue: order.direct.email.queue #C2队列
    wx:
      queue: order.direct.wx.queue #C3队列

3、EMailService.java

package com.zh.srp.mq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class EMailService {

//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性 autoDelete = "false":代表持久化
    @RabbitListener(
         bindings = @QueueBinding(value = @Queue(value = "${mq.direct.email.queue}",
        autoDelete = "false"),
        exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT),
        key = "email" //路由Key
    ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("email-log--------->"+Message);
    }




}

4、LOGService.java

package com.zh.srp.mq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性
public class LOGService {


    @RabbitListener(
        bindings = @QueueBinding(value = @Queue(value = "${mq.direct.log.queue}",
        autoDelete = "false"),
        exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT),
        key = "log" //路由Key
    ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("log-log--------->"+Message);
    }




}

5、WXService.java

package com.zh.srp.mq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
//会把申请的队列和交换机进行绑定
//确定消息的模式 fanout  direct topic
//确定队列queue的持久性
public class WXService {


    @RabbitListener(
        bindings = @QueueBinding(value = @Queue(value = "${mq.direct.wx.queue}",
        autoDelete = "false"),
        exchange = @Exchange(value = "${mq.direct.exchangName}",type = ExchangeTypes.DIRECT),
        key = "wx" //路由Key
    ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("WX-log--------->"+Message);
    }




}

(三)测试

1、启动消费者DirectCApplication.java 2、启动生产者的Test类的测试方法

消费

生产

三、topic模式

(一)生产者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8989

#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: ip
    virtual-host: /

#配置其他
mq:
  topic:
    exchangName: linux.topic

3、OrderService.java

package com.zh.srp.mq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.xml.crypto.Data;
import java.util.Date;
import java.util.UUID;

@Component
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //1、定义交换机
    @Value("${mq.topic.exchangName}")
    private String exchangName;

    //WX
    public void QQ(int i){
        //订单信息
        String orderId = UUID.randomUUID().toString();
        //消息
        String message = "你的WX第【"+i+"】个信息是:" + orderId + new Date().toString();
        System.out.println("正在发送WX----->:"+message);

        //通配符topic
        rabbitTemplate.convertAndSend(exchangName,"qq.log",message);
    }

    //QQ
    public void WX(int i){
        //订单信息
        String orderId = UUID.randomUUID().toString();
        //消息
        String message = "你的QQ第【"+i+"】个信息是:" + orderId + new Date().toString();
        System.out.println("正在发送QQ----->:"+message);
        //通配符topic
        rabbitTemplate.convertAndSend(exchangName,"wx.log",message);
    }
}

4、TopicApplicationTests.java

package com.zh.srp;

import com.zh.srp.mq.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class TopicApplicationTests {

    @Autowired
    private OrderService producer;

    @Test
    void contextLoads() throws InterruptedException {

        for (int i = 0; i < 1; i++) {
            producer.QQ(i);
            producer.WX(i);
            Thread.sleep(2);
        }

    }

}

(二)消费者

1、目录结构

2、application.yml文件

#服务端口
server:
  port: 8081

#配置rabbitmq
spring:
  rabbitmq:
    port: 5672
    username: admin
    password: 123456
    host: ip
    virtual-host: /

#配置其他
mq:
  topic:
    exchangName: linux.topic
    qq: #日志队列
      queue: linux.topic.qq.queue #C1队列
    wx:
      queue: linux.topic.wx.queue #C3队列

3、QQService.java

package com.zh.srp.mq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class QQService {

    @RabbitListener(
            bindings = @QueueBinding(value = @Queue(value = "${mq.topic.qq.queue}",
                    autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.topic.exchangName}",type = ExchangeTypes.TOPIC),
                    key = "qq.*" //路由Key
            ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("QQ-log--------->"+Message);
    }




}

4、WXService.java

package com.zh.srp.mq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class WXService {
    @RabbitListener(
            bindings = @QueueBinding(value = @Queue(value = "${mq.topic.wx.queue}",
                    autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.topic.exchangName}",type = ExchangeTypes.TOPIC),
                    key = "wx.*" //路由Key
            ))
    @RabbitHandler
    public void sendMessage(String Message){
        System.out.println("WX-log--------->"+Message);
    }




}

(三)测试

1、启动消费者 2、启动生产者

生产者

消费者