RabbitMQ研究(三)Java-API使用

时间:2019-01-18
本文章向大家介绍RabbitMQ研究(三)Java-API使用,主要包括RabbitMQ研究(三)Java-API使用使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

链接RabbitMQ

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("guest");
factory.setPassword("guest");

Connection connection = factory.newConnection();	//创建链接

也可以将setHostsetPortsetUsernamesetPassword直接改成使用URI的方式链接
factory.setUri( "amqp:guest : guest@localhost:5672/");

Connection可以用来创建多个Channel实例,但是Channel实例不能再线程间共享,应用程序应该为每个线程开辟一个Channel。某些情况下Channle的操作可以并发运行,但是在其他情况下会导致在网络上出现错误的通信交错,同时也会影响发送方确认机制的运行,多线程间共享Channel实例是非线程安全的。

使用交换器和队列

交换器和队列在使用的时候,必须先声明

//创建一个 type = direct 持久化交换器
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

上面是创建了一个持久化、非自动删除的、绑定类型为direct的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(队列名称是由RabbitMQ自动生成的)

如果要在应用中共享一个队列,可以如下:

channel.exchangeDeclare (exchangeName , "direct" , true) ;
channel.queueDeclare(queueName , true , false , false , null);
channel.queueBind(queueName , exchangeName, routingKey) ;

exchangeDeclare方法详解

exchangeDeclare有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成

Exchange.DeclareOk exchangeDeclare(String exchange ,
                        String type , boolean durable ,
                        boolean autoDelete , boolean internal ,
                        Map<String, Object> arguments) throws IOException ;

这个方法的返回值是Exchange.DeclareOK,用来标识成功声明了一个交换器。
各个参数详细说明如下:

  • exchange:交换器的名称
  • type:交换器的类型,常见的如fanout、direct、topic。
  • durable:设置是否持久化。durable设置为true表示持久化,反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失交换器的相关信息。
  • autoDelete:设置是否自动删除。autoDelete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。
  • internal:设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  • argument:其他一些参数
Exchange.DeclareOk exchangeDeclare(String exchange , String type) throws
IOExcept 工on ;
Exchange.DeclareOk exchangeDeclare(String exchange, String type , boolean
durab1e) throws IOExcept 工on ;
Exchange.DeclareOk exchangeDeclare(String exchange, String type , boolean
durable , boolean autoDelete , Map<String , Object> arguments) throws IOException ;

有声明创建交换器的方法,当然也有删除交换器的方法。相应的方法如下:

Exchange.DeleteOk exchangeDelete(String exchange) throws IOException ;
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws
IOException ;
Exchange.DeleteOk exchangeDelete(String exchange, boo1ean ifUnused) throws
IOException;

其中exchange 表示交换器的名称,而ifUnused 用来设置是否在交换器没有被使用的情
况下删除。如果isUnused 设置为true ,则只有在此交换器没有被使用的情况下才会被删除:
如果设置false ,则无论如何这个交换器都要被删除。

queueBind方法详解

将队列和交换器绑定的方法如下

Queue.BindOK.queueBind(String queue, String exchange, String routingKey) throws IOException;
Queue.BindOK.queueBind(String queue, String exchange, String routingKey, Map<String, Object> argumenets) throws IOException;
Queue.BindOK.queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> argumenets) throws IOException;

方法中涉及的参数详解:

  • queue:队列名称
  • exchange:交换器的名称
  • routingKey:用来绑定队列和交换器的路由键
  • argument:定义绑定的一些参数

nowait这个里面是有个nowait参数,这个nowait参数指的是不需要服务器返回,注意这个方法的返回值是void,如果没有特殊的缘由和应用场景,不建议使用这个方法
生产者和消费者都能够使用queueDeclare来声明一个队列,但是如果消费者在同一个信道上订阅了另外一个队列,就无法再次声明队列了。必须先取消订阅,然后将信道设置为传输模式,才能声明队列

不仅可以将队列和交换器绑定起来,也可以将已经被绑定的队列和交换器进行解绑。具体有下:

Queue.UnbindOK.queueUnbind(String queue, String exchange, String routingKey) throws IOException;
Queue.UnbindOK.queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

exchangeBind方法

不仅可以将交换器和队列绑定,也可以将交换器和交换器绑定,下面表示从source交换器转发到destination交换器,某种程度上来说是destination交换器可以看作一个队列

Exchange.BindOK exchangeBind(String destination, String source, String routingKey) throws IOException;
Exchange.BindOK exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
//表示不等待服务器回执信息.可以提高访问速度
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

何时创建

RabbitMQ的消息存储在队列中,交换器的使用并不真正的耗费服务器的性能,而队列会。如果要衡量RabbitMQ当前的QPS只需要看队列即可。在实际业务中,需要对创建的队列的流量、内存占用及网卡占用有一个清晰的认知,预估其平均值和峰值。

按照RabbitMQ官方建议,生产者和消费者都应该去创建队列(避免手工创建)。但是如果在本身设计的时候,就通过后台管理界面已经创建好了,那么可以免去声明的过程,直接使用即可。

预先创建资源有个优点:确保交换器和队列正确绑定。实际情况下,由于开发人员水平参吃不齐,可能发送消息的交换器并没有绑定任何队列,那么消息就会丢失;或者交换器绑定了队列,但是RoutingKey和BindKey没有正确匹配,也会丢失。

发送消息

简单发送消息

byte[] message = "Hello,Nick".getBytes();
channel.basicPublish(exchangeName, routingKey, null, message);
//带Headers消息
Map<String, Object> headers = new HashMap();
headers.put("token", "xxxx");

channel.basicPublish(exchnageName, routingKey,
         new AMQP.BasicProperties.Builder()
         .contentType("text/plain")
         .deliveryMode(2)	//投递模式为2表示会持久化
         .expiration("60000")	//带过期时间
         .priority(1)	//优先级
         .userId("hidden")	
         .build()), message);

消息持久化,需要将投递模式(delivery mode)设置为2,消息会持久化(刷盘)到服务器硬盘中。同时这条消息的优先级(priority)设置为1,content-type为text/plain

详细API如下:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

具体参数解释:

  • exchange:交换器的名称,指明消息需要发送到哪个交换器。如果设置为空字符串,则消息会发送到模式的交换器中
  • routingKey:路由键,交换器根据路由键将消息存储到相应的队列中
  • props:消息的基本属性集:contentTypecontentEncodingheadersdeliveryModeprioritycorrelationIdreplyToexpirationmessageIdtimestamptypeuserIdappIdclusterId
  • byte[] body:消息体(payload),真正的消息,在SpringBoot中整理Rabbit中有直接使用@Payload注解

消费消息

RabbitMQ的消费模式有两种:推式(Push)和拉式(Pull)。推式采用Basic.Consume进行消费,而拉式采用Basic.Get消费。

推式

推式主要是

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

不同的订阅采用不同的消费者标签(consumerTag)区分,同一个Channel的消费者也需要通过唯一的消费者标签区分

boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName, autoAck, "myConsumerTag", 
    new DefaultConsumer(channel) {
        @Override
        public void headleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String routingKey = envelope.getRoutingKey();
            String contentType = properties.getContentType();
            long deliveryTag = envelope.getDeliveryTag();
            channel.basicAck(deliveryTag, false);
        }
    });

注意显示的设置autoAck为false,可以防止消息不必要的丢失

详细API如下:

String basicConsume(String queue , Consumer callback) throws IOException ;

String basicConsume(String 守ueue , boolean autoAck, Consumer callback) throws IOException;

String bas 工cConsume(String queue , boolean autoAck, Map<String, Object> arguments, 
    Consumer callback) throws IOException ;

String basicConsume(String queue , bool ean autoAck, String consumerTag,
    Consumer callback) throws IOException ;

String basicConsume(String queue , boolean autoAck, String consumerTag,
    boolean noLocal, boolean exclusive, Map<Str 工ng , Object> arguments, 
    Consumer callback) throws IOException;

参数讲解:

  • queue:队列的名称
  • autoAck:是否自动确认。建议设置为false,不自动确认
  • consumerTag:消费者标签,用来区分多个消费者
  • noLocal:设置为false,则表示不能讲同一个Connection中生产者发送的消息传送给这个Conection中的消费者
  • exclusive:是否排他
  • arguments:其他参数
  • callback:消费者的毁掉函数

如果要重写回调,直接重写handleDelivery十分方便

消费者消费需要考虑线程安全的问题。消费者客户端的这些callback会被分配到不同的线程,意味着消费者客户端可以安全的调用阻塞方法

每个Channel都有自己独立的线程,最好的方式是一个Channel对应一个消费者,每个消费者之间没有联系。

拉式

拉式是使用basicGet的方法单条获取消息,其返回值是GetResponse。Channel类的basicGet没有其他重载方法,只有一个:

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

Basic.Consume将信道Channel设置为接收模式,直到取消订阅。接收模式的时候,RabbitMQ会不断推送消息给消费者,当然推送消息的个数是受到Basic.Qos的限制,如果只想从队列中获取单条信息而不是持续订阅,建议还是使用Basic.Get进行消费。但是不能讲Basic.Get放在一个循环里面来代替Basic.Consume,这样会严重影响RabbitMQ的性能。所以为了实现高吞吐,大部分场景都是用Basic.Consume推式模式的。

消费者的确认和拒绝

设置autoAck参数,当autoAck等于false的时候,RabbitMQ会等待消费者显式的回复确认信号后才从内存(或者磁盘)中移除消息(实质上是先打上记号,再删除)。当autoAck=true的时候,RabbitMQ会自动发送出去消息为确认,然后从内存(或磁盘)删除,而不管消费者是否真正的消费到了这些消息。

采用消息确认以后,只要设置为false,就有足够的时间处理消息,不用担心处理消息过程中消息进程挂掉导致消息丢失,因为会不断的推送,直到消费者显式的调用Basic.Ack。队列中分为两部分:第一个是等待投递给消费者的消息;第二个是已经投递给消费者,但是还没有确认的消息。如果消费者断开连接,RabbitMQ会重新安排进入队列,等待投递。

通过以下命令查看队列消息数量:

# queuename就是你实际的队列名称
rabbitmqctl list_queues queuename message_ready

显式的调用拒绝的方法

//拒绝一条消息
void basicReject(long deliveryTag, boolean requeue) throws IOException;

//拒绝多条消息
void basicNack(long deliveryTag, boolean mutiple, boolean requeue) throws IOException;

如果requeue=true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果requeue=false,会RabbitMQ会立即从队列中删除,以后也不会再次推送。
mutiple=false,则表示拒绝编号为deliveryTag的消息,这时候basicNack和basicReject一样,如果mutiple=true,则表示拒绝deliveryTag编号之前所有未被当前消费的消息

requeue设置为false,其实是启用了死信队列。死信队列可以通过检测被拒绝或者未发送成功的消息来跟踪问题

可冲入队列:

Basic.RecoverOK basicRecover() throws IOException;
Basic.RecoverOK basicRecover(boolean requeue) throws IOException

这个chaneel.basicRecover方法用来请求RabbitMQ重新发送未被确认的消息。如果requeue设置为true,则未被确认的消息会被重新加入到队列中,这样对于同一个消息来说,可能会被分配给与之不同的消费者。如果requeue参数设置为false,那么同一条消息一定会被分配给与之相同的消费者,默认是true

关闭连接

通过使用如下代码关闭连接,释放资源。其实原则上关闭Connection的时候,Channel也会自动关闭

channel.close();
conn.close();

Connection和Channel有以下生命周期:

  • Open:开启状态,代表当前对象可以使用。
  • Closing:正在关闭状态
  • Closed:已经关闭

调用了close、程序正常关闭、异常关闭、网络异常断开,最终都会成为Closed状态

关闭方法的监听事件有addShutdownListener(ShutdownListener listener)removeShutdownListener(ShutdownListener listener)

connection.addShutdownListener(new ShutdownListener() {
    public void shutdownCompleted(ShutdownSignalException cause) {
        //业务代码
        if(cause.isHardError()) {
            Connection conn = (Connection)cause.getReference();
            if(!cause.isInitiatedByApplication()) {
                Method reason = cause.getReason();	//得到关闭的信息
            }
        }
    }
});