rabbitmq路由模式

时间:2019-10-07
本文章向大家介绍rabbitmq路由模式,主要包括rabbitmq路由模式使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

1.依赖:

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <!-- 消息队列 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.5.1</version>
        </dependency>

2.消息提供者

public class Test_4_direct_provider {

    // 向消息队列写消息
    @Test
    public void provider() throws Exception {
        // 1,连接服务器
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.202");
        // 浏览器访问rabbitmq后台管理用的是15672
        // 发消息用的的5672
        // 一个服务器可以有多个端口号,访问15672,服务器返回的是网页
        // 访问5672,可以发消息,也可以取消息
        factory.setPort(5672);
        factory.setUsername("jtadmin");
        factory.setPassword("jtadmin");
        factory.setVirtualHost("/jt");
        // 2,得到channel
        // com.rabbitmq.client.connection
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 定义交换机
        String exchangeName = "E2";
        // fanout 订阅模式
        // direct 路由模式
        // topic 主题模式
        channel.exchangeDeclare(exchangeName, "direct");
        // channel.queueDeclare("order", true, false,false, null);
        // 4,写消息
        boolean isRunning = true;

        String msg = "msg 01";
        String routingKey = "mobile";
        BasicProperties properties = null;
        channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());

        // 5,关闭连接
        channel.close();
        connection.close();
        // System.out.println("发送了" + msg);
    }

}

3.消息接收者

public class Test_4_direct_consumer1 {
    // 从消息队列服务器取消息
    @Test
    public void consumer() throws Exception {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.216.202");
        factory.setPort(5672);
        factory.setUsername("jtadmin");
        factory.setPassword("jtadmin");
        factory.setVirtualHost("/jt");
        // 2,得到channel
        // com.rabbitmq.client
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "E2";
        // 创建交换机
        // fanout 订阅模式
        // direct 路由模式
        // topic 主题模式
        channel.exchangeDeclare(exchangeName, "direct");

        String queueName = "E2 queue1";
        // p2:durable true 持久化,保存到硬盘
        boolean durable = true;
        // p3:exclusive false 别的程序也能访问
        boolean exclusive = false;
        // p4:autoDelete false 队列中的消息处理完了,不删队列
        boolean autoDelete = false;
        channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);
        // 把队列和交换机绑定
        channel.queueBind(queueName, exchangeName, "mobile");

        // 设置每次取几个数据
        channel.basicQos(1);
        // 4,得到消费者
        // 创建的consumer通过channel来读数据
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 去取数据
        // p2:autoack 开启手动确认
        boolean autoAck = false;
        channel.basicConsume(queueName, autoAck, consumer);
        // 5,遍历消费者consumer
        boolean isRunning = true;
        System.out.println("消费者1启动了");
        while (isRunning) {
            // delivery代表的是消息队列中的一个数据
            Delivery delivery = consumer.nextDelivery();
            byte[] body = delivery.getBody();
            String msg = new String(body);
            System.out.println("消费者1收到:" + msg + "发手机");
            // 不发送确认信息,服务器上能看到队列中的消息
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            channel.basicAck(deliveryTag, true);
        }
        // 6,连接关闭
        channel.close();
        connection.close();
    }

}

4.总结

原文地址:https://www.cnblogs.com/gxlaqj/p/11630868.html