springBoot集成rabbitMq

时间:2019-12-25
本文章向大家介绍springBoot集成rabbitMq,主要包括springBoot集成rabbitMq使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

前言

很久之前的笔记,整理下

1.新建工程,添加依赖

compile('org.springframework.boot:spring-boot-starter-amqp')

2.配置application.yml

配置服务端
spring:
application:
name: rabbit-test
rabbitmq:
host: 10.2.6.33
port: 5672
username: gaojinliang
password: 【自己对应的密码】

3.RabbitMq的基本概念

内部实际上也是AMQP中的基本概念,AMQP中增加了Exchange和Binding的角色。生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收,而
Binding决定交换器的消息应该发送到哪个队列。

Exchange 类型
Exchange分发消息的时候根据类型的不同分发策略有区别,目前有四种类型,direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header
而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

3.1direct

消息中的路由键(routing key)如果和Binding中的binding key一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发routing key标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。他是完全匹配,单播的模式。

以下是direct的场景,也是最简单,最常用的模式。

MQ配置
@Configuration
public class RabbitQueueConfig {

@Bean
public Queue queue1() {
    return new Queue("queue1", true); //持久化
}

}

发送者

@Autowired
private AmqpTemplate rabbitTemplate;

@GetMapping("/queue")
public String sendQueueMsg(@RequestParam("msg") String msg) {
rabbitTemplate.convertAndSend("queue1", msg);
return "success";
}
接收者

@Component
public class QueueReceiver {

@RabbitListener(queues = "queue1")
@RabbitHandler
public void process(String strs) {
System.out.println("接收到:" + strs);
}
}

3.2fanout

每个发到fanout类型交换器的消息都会分到所有绑定的队列上。fanout交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被

转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。

以下是fanout场景,单个微服务集群的情况下每个节点都绑定到一个direct交换机上 (例子中是服务端口区分队列,也可以是别的key),然后每个节点都能收到
消息做一些配置更新等操作。

MQ配置
@Configuration
public class RabbitFanoutConfig {

@Value("${server.port}")
private String port;

@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange ("fanoutExchange");
}

@Bean
public Queue fanoutTp() {
    return new Queue("fanout.tp."+port);
}    

@Bean
public Binding fanoutBinding() {
    return BindingBuilder.bind(fanoutTp()).to(fanoutExchange());
}

}

发送者

@GetMapping("/fanout")
public String sendFanoutMsg(@RequestParam("msg") String msg) {
rabbitTemplate.convertAndSend("fanoutExchange","", msg);
return "success";
}

接收者

@Component
public class FanoutRecerver {

@RabbitListener(queues = "fanout.tp.${server.port}")
@RabbitHandler
public void process(String strs) {
System.out.println("fanout.tp接收到:" + strs);
}

}

3.3topic

topic交换器通过模式匹配分配消息的路由键属性,将路由键和单个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定建的字符串切分成单词,这些单词

之间用点隔开。他同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。

以下是topic场景,单个微服务接收到同一个消息后多个功能协调工作,也适用按分类更新、同一问题需要特定人员知晓等场景

MQ配置:
@Configuration
public class RabbitTopicConfig {

@Bean
public TopicExchange topicExchange() {
    return new TopicExchange ("topicExchange");
}

@Bean
public Queue topicTp1() {
    return new Queue("topic.tp2.one");
}

@Bean
public Queue topicTp2() {
    return new Queue("topic.tp2.two");
}

@Bean
public Binding binding1() {
    return BindingBuilder.bind(topicTp1()).to(topicExchange()).with("topic.tp2.*"); //* 后面匹配单个key
}

@Bean
public Binding binding2() {
    return BindingBuilder.bind(topicTp2()).to(topicExchange()).with("topic.tp2.#"); //# 后面匹配多个key
}

}

发送者
@GetMapping("/topic")
public String sendTopicMsg(@RequestParam("msg") String msg) {
rabbitTemplate.convertAndSend("topicExchange","topic.tp2.all", msg);
return "success";
}

接收者
@Component
public class TopicRecerver {

@RabbitListener(queues = "topic.tp2.one")
@RabbitHandler
public void process(String strs) {
System.out.println("topic.tp2.one 接收到:" + strs);
}

@RabbitListener(queues = "topic.tp2.two")
@RabbitHandler
public void process2(String strs) {
System.out.println("topic.tp2.two 接收到:" + strs);
}

}

4.接收端手动确认

上面提到的内容都是消息自动确认,当我们程序中收到消息处理失败需要MQ重发时就需要用到手动确认。

application.yml 配置 spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
spring:
application:
name: rabbit-test
rabbitmq:
host: 10.2.6.33
port: 5672
username: liuyanhui
password: liuyanhui
listener:
simple:
acknowledge-mode: MANUAL

在监听一端
@Component
public class QueueReceiver {

@RabbitListener(queues = "queue2")
@RabbitHandler
public void process(String strs, Message message, Channel channel) throws IOException {
System.out.println("接收到:" + strs);

    //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认成功
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //确认失败
}

}

channel.basicAck 确认成功

channel.basicNack 确认失败 : 最后一个参数代表是否需要重发,需要根据实际执行情况做选择,否则可能陷入死循环。

原文地址:https://www.cnblogs.com/jnnleo/p/12096970.html