(一)RabbitMQ:RabbitMQ初体验

时间:2020-05-13
本文章向大家介绍(一)RabbitMQ:RabbitMQ初体验,主要包括(一)RabbitMQ:RabbitMQ初体验使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

 RabbitMQ是目前非常热门的一款消息中间件,凭借其高可靠,易扩展,高可用及丰富的功能特性受到越来越多企业的青睐。
 RabbitMQ时采用Erlang语言实现AMQP的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。

 RabbitMQ具体特点如下:

  • 可靠性:RabbitMQ使用一些机制来保证可靠性,如持久化,传输确认及发布确认等。
  • 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ已经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
  • 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
  • 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
  • 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP,MQTT等多种消息中间件协议。
  • 多语言客户端:RabbitMQ几乎支持所有常用语言,比如Java,Python,Ruby,PHP,C#,JavaScript等。
  • 管理界面:RabbitMQ提供了一易用的用户界面,使得用户可以监控和管理消息,集群中的节点等。
  • 插件机制:RabbitMQ提供了许多插件,以实现从多方面扩展,当然也可以编写自己的插件。

1.消息中间件

  • 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如:只包含文本字符串,JSON等,也可以很复杂,如内嵌对象。
  • 消息队列中间件(Message Queue Middleware,简称为MQ)是指利用高效可靠消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
  • 消息队列中间件,也可以称为消息队列或者消息中间件。一般有两种模式:点对点(P2P)模式,发布/订阅(Pub/Sub)模式。点对点模式:基于队列,生产者发送消息到队列,消费者从队列中接受消息,队列的存在使得消息的异步传输称为可能。发布/订阅模式:定义了如何向一个内容节点(主题topic,可认为是消息传递中介)发布和订阅消息,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。
  • 目前开源的消息中间件,比较主流的有:RabbitMQ,Kafka,ActiveMQ,RocketMQ等。面向消息的中间件(简称为MOM)提供了松散耦合的灵活方式集成应用程序的一种机制。它们提供了基于存储和转发的应用程序中间的异步数据发送,即应用程序彼此不直接通信,而是与作为中介的消息中间件通信。消息中间件提供了有保证的消息发送。应用程序开发人员无须了解远程过程调用(RPC)和网络通信协议的细节。

2.消息中间件的作用

 在不同的应用场景下可以展现不同的作用。

  • 解耦:在项目启动之初来预测会碰到什么需求是极其困难的。消息中间件在处理过程中间插入了一个隐含的,基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保它们遵守同样的皆苦约束即可。
  • 冗余(存储):有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在把一个消息从消息中间件中删除之前,需要你的处理系统明确地指出改消息已经被处理完成,从而确保你的数据被安全地保存直到你使用完毕。
  • 扩展性:因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。
  • 削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。
  • 可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
  • 顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。
  • 缓冲:在任何重要的系统中,都会存在需要不同处理时间元素。消息中间件通过一个缓冲层来帮助任务高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。
  • 异步通信:在很多时候应用不想也不需要立即处理消息。消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理。

3.RabbitMQ的安装及使用

3.1在安装RabbitMQ之前需要安装Erlang,建议使用较新版本Erlang,这样可以获得较多更新和改进。官网下载地址:https://www.erlang.org/downloads

 [root@instance-5x tar.gz]# tar -zxvf otp_src_20.0.tar.gz -C /usr/local/src
 [root@instance-5x tar.gz]# cd /usr/local/src/otp_src_20.0/
 [root@instance-5x otp_src_20.0]# ./configure --prefix=/usr/local/erlang
 [root@instance-5x otp_src_20.0]# make
 [root@instance-5x otp_src_20.0]# make install

若:出现报错信息:No curses library functions found。需要安装ncurses。

 [root@instance-5x otp_src_20.0]# yum  install  ncurses-devel

配置环境变量

 [root@instance-5x otp_src_20.0]# vim /etc/profile
 export ERLANG_HOME=/usr/local/erlang
 PATH=$PATH:$JAVA_HOME/bin:$ERLANG_HOME/bin
 [root@instance-5x otp_src_20.0]# . /etc/profile

输入erl命令来验证Erlang是否安装成功

[root@instance-5x otp_src_20.0]# erl
Erlang/OTP 20 [erts-9.0] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V9.0  (abort with ^G)
1>

3.2RabbitMQ的安装

 官网下载地址:https://www.rabbitmq.com/releases/rabbitmq-server/ 安装包解压到相应的目下即可。

[root@instance-5x tar.gz]# tar -xvf rabbitmq-server-generic-unix-3.6.10.tar /usr/local/

配置环境变量

export RABBITMQ_HOME=/usr/local/rabbitmq_server-3.6.10
PATH=$PATH:$JAVA_HOME/bin:$ERLANG_HOME/bin:$RABBITMQ_HOME/sbin
[root@instance-5x otp_src_20.0]# . /etc/profile

运行RabbitMQ服务

[root@instance-5x otp_src_20.0]# rabbitmq-server  -detached

rabbitmq-server 命令后面添加一个“-detached” 参数是为了能够让RabbitMQ服务以守护进程的方式在后台运行,这样就不会因为当前Shell窗口的关闭而影响服务。

rabbitmqctl status 命令查看RabbitMQ是否正常启动:

[root@instance-5x tar.gz]# rabbitmqctl  status
Status of node 'rabbit@instance-och69p5x'
[{pid,25344},
{running_applications,
    [{rabbit,"RabbitMQ","3.6.10"},
     {mnesia,"MNESIA  CXC 138 12","4.15"},
     {rabbit_common,
         "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
         "3.6.10"},
     {ranch,"Socket acceptor pool for TCP protocols.","1.3.0"},
     {ssl,"Erlang/OTP SSL application","8.2"},
     {public_key,"Public key infrastructure","1.4.1"},
     {asn1,"The Erlang ASN1 compiler version 5.0","5.0"},
     {xmerl,"XML parser","1.3.15"},
     {os_mon,"CPO  CXC 138 46","2.4.2"},
     {syntax_tools,"Syntax tools","2.1.2"},
     {crypto,"CRYPTO","4.0"},
     {compiler,"ERTS  CXC 138 10","7.1"},
     {sasl,"SASL  CXC 138 11","3.0.4"},
     {stdlib,"ERTS  CXC 138 10","3.4"},
     {kernel,"ERTS  CXC 138 10","5.3"}]},
{os,{unix,linux}},
{erlang_version,
    "Erlang/OTP 20 [erts-9.0] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:64] [hipe] [kernel-poll:true]\n"},
{memory,
    [{total,51716936},
     {connection_readers,0},
     {connection_writers,0},
     {connection_channels,0},
     {connection_other,0},
     {queue_procs,30584},
     {queue_slave_procs,0},
     {plugins,0},
     {other_proc,18867000},
     {mnesia,66104},
     {metrics,184432},
     {mgmt_db,0},
     {msg_index,48984},
     {other_ets,1810296},
     {binary,448456},
     {code,21385216},
     {atom,891849},
     {other_system,8165535}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,415639142},
{disk_free_limit,50000000},
{disk_free,32308707328},
{file_descriptors,
    [{total_limit,65435},
     {total_used,3},
     {sockets_limit,58889},
     {sockets_used,0}]},
{processes,[{limit,1048576},{used,156}]},
{run_queue,0},
{uptime,24578},
{kernel,{net_ticktime,60}}]

4.生产和消费消息

Java代码展示
maven依赖:

<dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>4.2.1</version>
</dependency>

默认情况下,访问RabbitMQ服务的用户名和密码都是“guest”,这个账户有限制,默认只能通过本地网络(如 localhost)访问,远程网络访问受限,所以在实现生产和消费消息之前,需要另外添加一个用户,并设置相应的访问权限。
添加新用户,用户名为“root”,密码为“root123”

[root@instance-5x tar.gz]# rabbitmqctl add_user root root123
[root@instance-5x tar.gz]# rabbitmqctl setpermissions -p / root ".*" ".*" ".*"
[root@instance-5x tar.gz]# rabbitmqctl set_user_tags root administrator

提供者客户端代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMqProducer {

    private static String EXCHANGE_NAME = "exchage_demo";

    private static String QUEUE_NAME = "queue_demo";

    private static String ROUTING_KEY = "routingkey_demo";


    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //创建一个type="direct",持久化,非自动删除得交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
        //创建一个持久化,非排他的,非自动删除队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //将交换机与队列通过路由键(其实是绑定键,只不过direct类型下绑定键(bindingkey)和路由键(routingkey)一致才可以到达)绑定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        //发送一条持久化的消息
        String message = "hello world d!";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        //关闭资源
        channel.close();
        connection.close();

        System.out.println("finished...");

    }
}

消费者客户端代码:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class RabbitMqConsumer {

    private static String QUEUE_NAME = "queue_demo";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //创建连接
        Connection connection = ConnectionUtil.getConnection();

        //创建信道
        final Channel channel = connection.createChannel();

        //设置客户端最多接受未被ack的消息个数
        channel.basicQos(64);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);

                System.out.println("recv message:" + new String(body));

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(QUEUE_NAME, consumer);
        //等待回调资源执行完毕后关掉资源
        TimeUnit.SECONDS.sleep(5);

        //关闭资源
        channel.close();
        connection.close();

        System.out.println("finished...");
    }
}

连接工具类代码:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtil {

    private static ConnectionFactory factory;

    static {
        factory = new ConnectionFactory();
        factory.setHost("a.x.y.z");
        factory.setPort(5672);
        factory.setUsername("username");
        factory.setPassword("password");

    }

    public static Connection getConnection() throws IOException, TimeoutException {
        return factory.newConnection();
    }
}

原文地址:https://www.cnblogs.com/everyingo/p/12881577.html