rocketMQ 本地环境搭建

时间:2019-08-29
本文章向大家介绍rocketMQ 本地环境搭建,主要包括rocketMQ 本地环境搭建使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

  最近在自己本地搭建rocketMQ,过程中遇到一些问题,现在总结一下,便于以后查看.

  首先打开rocket官网:http://rocketmq.apache.org, 点击latest realease,有source版本和binary版本供下载,点击quikstart可以看到source版本的构建步骤:

      source版本需要先安装了以下软件:

  1. 建议使用64位操作系统,Linux / Unix / Mac;
  2. 64位JDK 1.8+;
  3. Maven 3.2.x;
  4. Git;
  5. 4g +免费磁盘用于Broker服务器

  

  我系统环境是OS 10.13.6,下载的是binary版本,省去了构建成为二进制文件的步骤.

将rocketmq-all-4.4.0-bin-release.zip文件解压后得到rocketmq-all-4.4.0-bin-release文件夹,

配置环境变量ROCKETMQ_HOME=xxx/rocketmq-all-4.4.0-bin-release,其中xxx表示你的文件夹父路径,

例如/Users/david/Downloads/packages/rocketmq-all-4.4.0-bin-release.

再将ROCKETMQ_HOME加入到PATH中如 $ROCKETMQ_HOME/bin:$PATH

打开teiminal,输入sh mqnamesrv启动名称服务器,如下图:

可以看到Java HotSpot(TM) 64-Bit Server VM warning“警告信息,不要怕,

它是说CMS垃圾收集器已经过时,在未来的JVM版本中会被其他收集器替代,可以不用管他.

namesrv已经正常启动了

同样,再开一个terminal,输入sh mqbroker 启动broker.

如果在启动过程中遇到:

ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!

 

不要慌,这是说你的JAVA_HOME环境变量设置有问题,请检查环境变量.

如果检查后发现环境变量没问题,但还是报以上错误,这可能是系统问题吧,

记得OS系统有好几个文件可以配置环境变量,最常见的是用户目录下的.bash_profile,对当前用户生效,还有一个全局的,在etc目录下的profile文件

可以再确认一下,如果两个文件都配置了依然报错,那只好用最戳的方式解决:

打开mq的安装目录,找到bin文件夹下的runserver.sh和runbroker.sh,

在文件中主动加入你的JAVA_HOME,如下图:

一切顺利解决,至此namesrv和broker都已经成功启动,下面来写几个Javademo

首先加入maven依赖包:

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.3.0</version>
</dependency>

使用GRADLE构建的话加入:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

使用RocketMQ有以下3种发送消息的方式,分别适用不同场景:

1.同步发送消息:可靠的同步传输用于广泛的场景,如重要的通知消息,短信通知,短信营销系统等。

2.异步发送消息:异步传输通常用于响应时间敏感的业务场景。

3.以单向模式发送消息:单向传输用于需要中等可靠性的情况,例如日志收集。

我们选第一种发送方式写一个demo:

package com.example.demo.common.mq.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

通过循环发送100条消息,运行代码,结果抛出如下错误:

该错误表示找不到这个topic的路由,关于这个错误,网上也有一些对策,无外乎以下几个原因:

1.broker没有连上namesrv

2.producer没有连上namesrv

3.namesrv没有创建并维护该topic信息

4.netty包版本冲突

5.防火墙问题

前面3条可以通过查找日志确定问题,

  1).broker.log里面如果查到register broker to name server localhost:9876 OK 这样的信息表示broker已经连上namesrv了.

如果没有,那么你可以重新启动broker: sh mqbroker -n localhost:9876

  2).namesrv.log里面如果能够查到 new topic registered, TopicTest QueueData这样的信息,表示你的topic也已经被成功创建.

如果没有,那么你可以通过mq的admin工具主动生成该topic: sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t TopicTest

  3).producer有没有连上namesrv,只需检查一下代码中namesrv的值对不对就行了

  4).至于4和5那就需要你自己去跟踪源码发现问题了.

现在再来运行生产者代码,成功发送100条消息:

 消费者也来写个demo:

package com.example.demo.common.mq.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
public static void main(String[] args) throws MQClientException {

// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");

// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

});

//Launch the consumer instance.
consumer.start();

System.out.printf("Consumer Started.%n");
}
}

 成功消费100条消息:

 另外,关于rocketMQ的一些基本概念请参考另一篇:https://ex-jindawei001/p/11360431.html




原文地址:https://www.cnblogs.com/EX-JINDAWEI001/p/11426707.html