RabbitMQ动态创建消息队列和消费者,消费者上下线
时间:2019-01-31
本文章向大家介绍RabbitMQ动态创建消息队列和消费者,消费者上下线,主要包括RabbitMQ动态创建消息队列和消费者,消费者上下线使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
@Component
public class CustomizeDynamicConsumerContainer{
/**
* 用于存放全局消费者
*/
public final Map<String, DynamicConsumer> customizeDynamicConsumerContainer= new
ConcurrentHashMap<>();
}
public interface IDynamicConsumer extends ChannelAwareMessageListener {
void setContainer(SimpleMessageListenerContainer container);
default void shutdown() {}
}
public abstract class mqConsumer implements IDynamicConsumer {
private volatile boolean end = false;
private SimpleMessageListenerContainer container;
private boolean autoAck;
@Override
public void setContainer(SimpleMessageListenerContainer container) {
this.container = container;
autoAck = container.getAcknowledgeMode().isAutoAck();
}
@Override
public void shutdown() {
end = true;
}
protected void autoAck(Message message, Channel channel, boolean success) throws IOException {
if (autoAck) {
return;
}
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
public abstract boolean process(Message message, Channel channel);
}
public class DynamicConsumer {
private static final Logger logger = LoggerFactory.getLogger(DynamicConsumer.class);
private SimpleMessageListenerContainer container;
public DynamicConsumer(MQContainerFactory fac, String name) throws Exception {
SimpleMessageListenerContainer container = fac.getObject();
container.setMessageListener(new AbsMQConsumer() {
@Override
public boolean process(Message message, Channel channel) {
logger.info("DynamicConsumer,{},{}, {},name,fac.getQueue(),new String(message.getBody())");
distributionConsumerMsg(message, channel);
return true;
}
});
this.container = container;
}
//启动消费者监听
public void start() {
container.start();
}
//消费者停止监听
public void stop() {
container.stop();
}
//消费者重启
public void shutdown() {
container.shutdown();
}
/**
* 用户扩展处理消息
*/
public void distributionConsumerMsg(Message message, Channel channel) {
}
}
@Data
@Builder
public class DynamicConsumerContainerFactory implements FactoryBean<SimpleMessageListenerContainer> {
private ExchangeType exchangeType;
private String directExchange;
private String topicExchange;
private String fanoutExchange;
private String queue;
private String routingKey;
private Boolean autoDeleted;
private Boolean durable;
private Boolean autoAck;
private ConnectionFactory connectionFactory;
private RabbitAdmin rabbitAdmin;
private Integer concurrentNum;
// 消费方
private IMqConsumer consumer;
private Exchange buildExchange() {
if (null != directExchange) {
exchangeType = ExchangeType.DIRECT;
return new DirectExchange(directExchange);
} else if (null != topicExchange) {
exchangeType = ExchangeType.TOPIC;
return new TopicExchange(topicExchange);
} else if (null != fanoutExchange) {
exchangeType = ExchangeType.FANOUT;
return new FanoutExchange(fanoutExchange);
} else {
if (StringUtils.isEmpty(routingKey)) {
throw new IllegalArgumentException("defaultExchange's routingKey should not be null!");
}
exchangeType = ExchangeType.DEFAULT;
return new DirectExchange("");
}
}
private Queue buildQueue() {
if (StringUtils.isEmpty(queue)) {
throw new IllegalArgumentException("queue name should not be null!");
}
return new Queue(queue, durable == null ? false : durable, false, autoDeleted == null ? true : autoDeleted);
}
private Binding bind(Queue queue, Exchange exchange) {
return exchangeType.binding(queue, exchange, routingKey);
}
private void check() {
if (null == rabbitAdmin || null == connectionFactory) {
throw new IllegalArgumentException("rabbitAdmin and connectionFactory should not be null!");
}
}
@Override
public SimpleMessageListenerContainer getObject() throws Exception {
check();
Queue queue = buildQueue();
Exchange exchange = buildExchange();
Binding binding = bind(queue, exchange);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(binding);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setRabbitAdmin(rabbitAdmin);
container.setConnectionFactory(connectionFactory);
container.setQueues(queue);
container.setPrefetchCount(20);
container.setConcurrentConsumers(concurrentNum == null ? 1 : concurrentNum);
container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
if (null != consumer) {
container.setMessageListener(consumer);
}
return container;
}
@Override
public Class<?> getObjectType() {
return SimpleMessageListenerContainer.class;
}
}
消费者创建类
public class ConsumerGenerate {
/**
* 创建消费者
*
* @param connectionFactory
* @param rabbitAdmin
* @param exchangeName
* @param queueName
* @param routingKey
* @param autoDelete
* @param durable
* @param autoAck
* @return
* @throws Exception
*/
public static DynamicConsumer genConsumer(ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin,
String exchangeName, String queueName, String routingKey, boolean autoDelete, boolean durable,
boolean autoAck) throws Exception {
MQContainerFactory fac =
MQContainerFactory.builder().directExchange(exchangeName).queue(queueName).autoDeleted(autoDelete)
.autoAck(autoAck).durable(durable).routingKey(routingKey).rabbitAdmin(rabbitAdmin)
.connectionFactory(connectionFactory).build();
return new DynamicConsumer(fac, queueName);
}
}
@Configuration
public class MqConfig {
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public AmqpProducer amqpProducer(ConnectionFactory connectionFactory) {
return new AmqpProducer(connectionFactory);
}
public class AmqpProducer {
private AmqpTemplate amqpTemplate;
public AmqpProducer(ConnectionFactory connectionFactory) {
amqpTemplate = new RabbitTemplate(connectionFactory);
}
/**
* 将消息发送到指定的交换器上
*
* @param exchange
* @param msg
*/
public void publishMsg(String exchange, String routingKey, Object msg) {
amqpTemplate.convertAndSend(exchange, routingKey, msg);
}
}
}
//实际业务使用使用方法
@Service
public class TestService {
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private CustomizeDynamicConsumerContainer customizeDynamicConsumerContainer;
@Autowired
private MqConfig.AmqpProducer amqpProducer
public void dynamicCreateConsumer(){
Map<String, DynamicConsumer> allQueue2ContainerMap = customizeDynamicConsumerContainer.customizeDynamicConsumerContainer;
DynamicConsumer consumer = null;
try {
//创建消费者
consumer = ConsumerGenerate
.genConsumer(connectionFactory, rabbitAdmin,"test001-1", "test001", "routingKey001"
, false, true, true);
} catch (Exception e) {
logger.error("系统异常",e);
}
allQueue2ContainerMap.put("test001", consumer);
//启动消费者
consumer.start();
//发送消息到交换机
amqpProducer.publishMsg("test001-1", "routingKey001", "Hello MQ!");
}
/**
* 暂停消费者
*/
public void stop(){
Map<String, DynamicConsumer> allQueue2ContainerMap = customizeDynamicConsumerContainer.customizeDynamicConsumerContainer;
DynamicConsumer dynamicConsumer = allQueue2ContainerMap.get("test001");
dynamicConsumer.stop();
}
}
- 《Effective Java》—— 创建与销毁对象
- web调试工具——Fiddler使用介绍(一)
- Windows c++应用程序通用日志组件(组件及测试程序下载)
- 快速排序
- 如何在Elasticsearch中安装中文分词器(IK+pinyin)
- Python抓取中文网页
- 《Effective Java》—— 读后总结
- unix共享内存要点
- LAMP=Linux+Apache+Mysql+Php
- unix共享内存要点
- Elasticsearch —— bulk批量导入数据
- 自己写的数据交换工具——从Oracle到Elasticsearch
- PHP程序员应该掌握的10个技能
- Elasticsearch查询——布尔查询Bool Query
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- typescript高级用法之infer的理解与使用
- 基于业务场景下的图片/文件上传方案总结
- LeetCode96|二叉搜索树中的搜索
- LeetCode95|字符串中的第一个唯一字符
- LeetCode94|Pow(x,n)
- LeetCode93|数值的整数次方
- LeetCode92|排序数组
- 缓存 | 从本地缓存到分布式缓存, Guava, Caffeine, Memcached, Redis
- WebView三问—B站真题
- C++核心准则T.140:为所有可能重用的操作命名
- Service三问
- 事件分发机制三问
- C++核心准则T.141:如果你需要只在一个地方使用的简单的函数对象,使用无名的lambda表达式
- 基于DOM的XML文件解析类
- C++核心准则T.143:避免无意中编写非通用代码