RabbitMQ动态创建消息队列和消费者,消费者上下线

时间:2019-01-31
本文章向大家介绍RabbitMQ动态创建消息队列和消费者,消费者上下线,主要包括RabbitMQ动态创建消息队列和消费者,消费者上下线使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
@Component
public class CustomizeDynamicConsumerContainer{
   
   /**
     * 用于存放全局消费者
     */
    public final Map<String, DynamicConsumer> customizeDynamicConsumerContainer= new 
    ConcurrentHashMap<>();
}
public interface IDynamicConsumer extends ChannelAwareMessageListener {

    void setContainer(SimpleMessageListenerContainer container);

    default void shutdown() {}

}
public abstract class mqConsumer implements IDynamicConsumer {
    private volatile boolean end = false;
    private SimpleMessageListenerContainer container;
    private boolean autoAck;

    @Override
    public void setContainer(SimpleMessageListenerContainer container) {
        this.container = container;
        autoAck = container.getAcknowledgeMode().isAutoAck();
    }

    @Override
    public void shutdown() {
        end = true;
    }

    protected void autoAck(Message message, Channel channel, boolean success) throws IOException {
        if (autoAck) {
            return;
        }

        if (success) {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

    public abstract boolean process(Message message, Channel channel);
}
public class DynamicConsumer {


    private static final Logger logger = LoggerFactory.getLogger(DynamicConsumer.class);
    private SimpleMessageListenerContainer container;

    public DynamicConsumer(MQContainerFactory fac, String name) throws Exception {
        SimpleMessageListenerContainer container = fac.getObject();
        container.setMessageListener(new AbsMQConsumer() {
            @Override
            public boolean process(Message message, Channel channel) {
                logger.info("DynamicConsumer,{},{}, {},name,fac.getQueue(),new String(message.getBody())");
                distributionConsumerMsg(message, channel);
                return true;
            }
        });
        this.container = container;
    }

    //启动消费者监听
    public void start() {
        container.start();
    }

   //消费者停止监听
    public void stop() {
        container.stop();
    }

    //消费者重启
    public void shutdown() {
        container.shutdown();
    }


    /**
     * 用户扩展处理消息
     */
    public void distributionConsumerMsg(Message message, Channel channel) {

    }

}
@Data
@Builder
public class DynamicConsumerContainerFactory implements FactoryBean<SimpleMessageListenerContainer> {
    private ExchangeType exchangeType;

    private String directExchange;
    private String topicExchange;
    private String fanoutExchange;

    private String queue;
    private String routingKey;


    private Boolean autoDeleted;
    private Boolean durable;
    private Boolean autoAck;

    private ConnectionFactory connectionFactory;
    private RabbitAdmin rabbitAdmin;
    private Integer concurrentNum;

    // 消费方
    private IMqConsumer consumer;


    private Exchange buildExchange() {
        if (null != directExchange) {
            exchangeType = ExchangeType.DIRECT;
            return new DirectExchange(directExchange);
        } else if (null != topicExchange) {
            exchangeType = ExchangeType.TOPIC;
            return new TopicExchange(topicExchange);
        } else if (null != fanoutExchange) {
            exchangeType = ExchangeType.FANOUT;
            return new FanoutExchange(fanoutExchange);
        } else {
            if (StringUtils.isEmpty(routingKey)) {
                throw new IllegalArgumentException("defaultExchange's routingKey should not be null!");
            }
            exchangeType = ExchangeType.DEFAULT;
            return new DirectExchange("");
        }
    }


    private Queue buildQueue() {
        if (StringUtils.isEmpty(queue)) {
            throw new IllegalArgumentException("queue name should not be null!");
        }

        return new Queue(queue, durable == null ? false : durable, false, autoDeleted == null ? true : autoDeleted);
    }


    private Binding bind(Queue queue, Exchange exchange) {
        return exchangeType.binding(queue, exchange, routingKey);
    }


    private void check() {
        if (null == rabbitAdmin || null == connectionFactory) {
            throw new IllegalArgumentException("rabbitAdmin and connectionFactory should not be null!");
        }
    }


    @Override
    public SimpleMessageListenerContainer getObject() throws Exception {
        check();

        Queue queue = buildQueue();
        Exchange exchange = buildExchange();
        Binding binding = bind(queue, exchange);

        rabbitAdmin.declareQueue(queue);
        rabbitAdmin.declareExchange(exchange);
        rabbitAdmin.declareBinding(binding);

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setRabbitAdmin(rabbitAdmin);
        container.setConnectionFactory(connectionFactory);
        container.setQueues(queue);
        container.setPrefetchCount(20);
        container.setConcurrentConsumers(concurrentNum == null ? 1 : concurrentNum);
        container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);


        if (null != consumer) {
            container.setMessageListener(consumer);
        }
        return container;
    }

    @Override
    public Class<?> getObjectType() {
        return SimpleMessageListenerContainer.class;
    }
}

消费者创建类

public class ConsumerGenerate {

    /**
     * 创建消费者
     *
     * @param connectionFactory
     * @param rabbitAdmin
     * @param exchangeName
     * @param queueName
     * @param routingKey
     * @param autoDelete
     * @param durable
     * @param autoAck
     * @return
     * @throws Exception
     */
    public static DynamicConsumer genConsumer(ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin,
                                              String exchangeName, String queueName, String routingKey, boolean autoDelete, boolean durable,
                                              boolean autoAck) throws Exception {
        MQContainerFactory fac =
                MQContainerFactory.builder().directExchange(exchangeName).queue(queueName).autoDeleted(autoDelete)
                        .autoAck(autoAck).durable(durable).routingKey(routingKey).rabbitAdmin(rabbitAdmin)
                        .connectionFactory(connectionFactory).build();
        return new DynamicConsumer(fac, queueName);
    }

}
@Configuration
public class MqConfig {


    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }


    @Bean
    public AmqpProducer amqpProducer(ConnectionFactory connectionFactory) {
        return new AmqpProducer(connectionFactory);
    }

    public class AmqpProducer {

        private AmqpTemplate amqpTemplate;

        public AmqpProducer(ConnectionFactory connectionFactory) {
            amqpTemplate = new RabbitTemplate(connectionFactory);
        }

        /**
         * 将消息发送到指定的交换器上
         *
         * @param exchange
         * @param msg
         */
        public void publishMsg(String exchange, String routingKey, Object msg) {
            amqpTemplate.convertAndSend(exchange, routingKey, msg);
        }
    }
}

//实际业务使用使用方法

@Service
public class TestService {
    @Autowired
    private ConnectionFactory connectionFactory;
    @Autowired
    private RabbitAdmin rabbitAdmin;
    @Autowired
    private CustomizeDynamicConsumerContainer customizeDynamicConsumerContainer;
    @Autowired
    private MqConfig.AmqpProducer amqpProducer


    public void dynamicCreateConsumer(){
        Map<String, DynamicConsumer> allQueue2ContainerMap = customizeDynamicConsumerContainer.customizeDynamicConsumerContainer;
        DynamicConsumer consumer = null;
        try {
            //创建消费者
            consumer = ConsumerGenerate
                    .genConsumer(connectionFactory, rabbitAdmin,"test001-1", "test001", "routingKey001"
                            , false, true, true);
        } catch (Exception e) {
             logger.error("系统异常",e);
        }
        allQueue2ContainerMap.put("test001", consumer);
        //启动消费者
        consumer.start();
        //发送消息到交换机
       amqpProducer.publishMsg("test001-1", "routingKey001", "Hello MQ!");
    }

    /**
     * 暂停消费者
     */
    public void stop(){
        Map<String, DynamicConsumer> allQueue2ContainerMap = customizeDynamicConsumerContainer.customizeDynamicConsumerContainer;
        DynamicConsumer dynamicConsumer = allQueue2ContainerMap.get("test001");
        dynamicConsumer.stop();
    }


}