rocketMQ 本地环境搭建
最近在自己本地搭建rocketMQ,过程中遇到一些问题,现在总结一下,便于以后查看.
首先打开rocket官网:http://rocketmq.apache.org, 点击latest realease,有source版本和binary版本供下载,点击quikstart可以看到source版本的构建步骤:
source版本需要先安装了以下软件:
- 建议使用64位操作系统,Linux / Unix / Mac;
- 64位JDK 1.8+;
- Maven 3.2.x;
- Git;
- 4g +免费磁盘用于Broker服务器
我系统环境是OS 10.13.6,下载的是binary版本,省去了构建成为二进制文件的步骤.
将rocketmq-all-4.4.0-bin-release.zip文件解压后得到rocketmq-all-4.4.0-bin-release文件夹,
配置环境变量ROCKETMQ_HOME=xxx/rocketmq-all-4.4.0-bin-release,其中xxx表示你的文件夹父路径,
例如/Users/david/Downloads/packages/rocketmq-all-4.4.0-bin-release.
再将ROCKETMQ_HOME加入到PATH中如 $ROCKETMQ_HOME/bin:$PATH
打开teiminal,输入sh mqnamesrv启动名称服务器,如下图:
可以看到”Java HotSpot(TM) 64-Bit Server VM warning“警告信息,不要怕,
它是说CMS垃圾收集器已经过时,在未来的JVM版本中会被其他收集器替代,可以不用管他.
namesrv已经正常启动了
同样,再开一个terminal,输入sh mqbroker 启动broker.
如果在启动过程中遇到:
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
不要慌,这是说你的JAVA_HOME环境变量设置有问题,请检查环境变量.
如果检查后发现环境变量没问题,但还是报以上错误,这可能是系统问题吧,
记得OS系统有好几个文件可以配置环境变量,最常见的是用户目录下的.bash_profile,对当前用户生效,还有一个全局的,在etc目录下的profile文件
可以再确认一下,如果两个文件都配置了依然报错,那只好用最戳的方式解决:
打开mq的安装目录,找到bin文件夹下的runserver.sh和runbroker.sh,
在文件中主动加入你的JAVA_HOME,如下图:
一切顺利解决,至此namesrv和broker都已经成功启动,下面来写几个Javademo
首先加入maven依赖包:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
使用GRADLE构建的话加入:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
使用RocketMQ有以下3种发送消息的方式,分别适用不同场景:
1.同步发送消息:可靠的同步传输用于广泛的场景,如重要的通知消息,短信通知,短信营销系统等。
2.异步发送消息:异步传输通常用于响应时间敏感的业务场景。
3.以单向模式发送消息:单向传输用于需要中等可靠性的情况,例如日志收集。
我们选第一种发送方式写一个demo:
package com.example.demo.common.mq.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
通过循环发送100条消息,运行代码,结果抛出如下错误:
该错误表示找不到这个topic的路由,关于这个错误,网上也有一些对策,无外乎以下几个原因:
1.broker没有连上namesrv
2.producer没有连上namesrv
3.namesrv没有创建并维护该topic信息
4.netty包版本冲突
5.防火墙问题
前面3条可以通过查找日志确定问题,
1).broker.log里面如果查到register broker to name server localhost:9876 OK 这样的信息表示broker已经连上namesrv了.
如果没有,那么你可以重新启动broker: sh mqbroker -n localhost:9876
2).namesrv.log里面如果能够查到 new topic registered, TopicTest QueueData这样的信息,表示你的topic也已经被成功创建.
如果没有,那么你可以通过mq的admin工具主动生成该topic: sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t TopicTest
3).producer有没有连上namesrv,只需检查一下代码中namesrv的值对不对就行了
4).至于4和5那就需要你自己去跟踪源码发现问题了.
现在再来运行生产者代码,成功发送100条消息:
消费者也来写个demo:
package com.example.demo.common.mq.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
成功消费100条消息:
另外,关于rocketMQ的一些基本概念请参考另一篇:https://ex-jindawei001/p/11360431.html
原文地址:https://www.cnblogs.com/EX-JINDAWEI001/p/11426707.html
- tomcat源码解读四 tomcat中的processer
- tomcat源码解读三(2) tomcat中JMX的源码分析
- 程序的入口
- tomcat源码解读三(1) tomcat的jmx管理
- 利用xinetd实现简单web服务器(镜像站)
- tomcat源码解读二 tomcat的生命周期
- IOCP反射服务器
- 给PHP开发者讲讲PHP源码-第二部分
- 给PHP开发者讲讲PHP源码-第一部分
- tomcat源码解读一 Digester的解析方式
- Markdown 语法说明(简体中文版)
- C++中_onexit()用法简述
- tomcat请求处理分析(六)servlet的处理过程
- FFmpeg菜鸡互啄#第1篇#一些基本概念
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 3分钟短文:Laravel 从软删除说到模型作用域的概念
- Flutter 系统是如何实现ExpansionPanelList的
- Flutter 粘合剂CustomScrollView控件
- Flutter 1.17版本重磅发布
- Flutter 首页必用组件NestedScrollView
- 【Flutter实战】文本组件及五大案例
- 你真的会用Flutter日期类组件吗
- 【Flutter实战】图片组件及四大案例
- Flutter 标签类控件大全Chip
- 【Flutter实战】六大布局组件
- 【Flutter实战】定位装饰权重组件及柱状图案例
- Flutter为什么使用Dart?
- Flutter中如何使用WillPopScope
- 谈谈我对 Flutter 发展前景 和 “嵌套地狱” 的浅显看法
- 超过百万的StackOverflow Flutter 问题-第二期