RabbitMQ之Pub/Sub模式 (订阅模式)

时间:2021-10-03
本文章向大家介绍RabbitMQ之Pub/Sub模式 (订阅模式),主要包括RabbitMQ之Pub/Sub模式 (订阅模式)使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

概念

在前面的模式中 一个消息都只能被一个消费者使用
但是在阅读模式一个消息可以被多个消费者使用

简单例子

交换机创建参数:

编写生产者 创建一个交换机和两个队列 并做好关系绑定

public class PubProducer {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.198.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //创建连接
            connection = connectionFactory.newConnection("生产者");
            //获取通道
            channel = connection.createChannel();
            //创建交换机以及两个队列 同时绑定关系
            String exchangeName = "test_fanout";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,false,false,false,null);
            String queue1Name = "test_fanout_queue1";
            String queue2Name = "test_fanout_queue2";
            channel.queueDeclare(queue1Name,false,false,false,null);
            channel.queueDeclare(queue2Name,false,false,false,null);
            //绑定关系 第三个参数为routingKey 绑定规则 fanout使用""
            channel.queueBind(queue1Name,exchangeName,"");
            channel.queueBind(queue2Name,exchangeName,"");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        finally {
            //关闭通道
            if(channel != null && channel.isOpen()){
                try {
                    channel.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }

            if(connection != null && connection.isOpen()){
                try {
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

运行程序后在web页面查看交换机绑定关系

消费者编写和之前没区别 给好队列名字即可

public class PubConsumer {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.198.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //创建连接
            connection = connectionFactory.newConnection("消费者");
            //获取通道
            channel = connection.createChannel();
            //通过通道声明队列,创建交换机等一系列事情
            channel.basicConsume("test_fanout_queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("1号消费者接受到的消息为 " + new String(delivery.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("收取消息失败");
                }
            });
            //卡一下
            System.out.println("键盘输入关闭消费者");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        finally {
            //关闭通道
            if(channel != null && channel.isOpen()){
                try {
                    channel.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }

            if(connection != null && connection.isOpen()){
                try {
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

另外一个消费者代码改为队列2即可

运行测试:
先运行两个消费者等待生产者消息
运行生产者后虽然只有一条消息但是可以看到两个消费者都拿到消息了

原文地址:https://www.cnblogs.com/OfflineBoy/p/15365211.html