RocketMQ学习四-生产者producer
时间:2022-07-22
本文章向大家介绍RocketMQ学习四-生产者producer,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
前面我们已经知道RocketMQ的生产者和消费者依赖NameServer和broker,因此需要先启动nameServer和broker。同时nameServer首先会解析并填充nameServerConfig、NettyServerConfig的属性信息。然后实例化namesrvController,加载kv配置,开启两个定时任务。同时nameServer中存放了路由的基础信息,同时能够管理broker节点(路由的注册和删除、发现)。
路由注册是通过broker和nameServer的心跳功能实现的。broker每个30秒向集群中所有的nameServer发送心跳包,nameServer收到broker发送的心跳包时会更新brokerLiveTable缓存中BrokerLiveTable中的lastUpdateTimeStamp,然后nameServer每隔10s扫描brokerLiveTable,如果连续120s没有收到心跳包,nameServer将移除该broker的路由信息同时关闭socket连接。
RocketMQ的生产者:
/**
* This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}.
* 生产者启动类
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/*
* Instantiate with a producer group name.
* 1.创建生产者对象,采用DefaultMQProducer创建
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
/*
* Launch the instance.
*/
producer.setNamesrvAddr("127.0.0.1:9876");
//2.启动生产者
producer.start();
for (int i = 0; i < 1000; i++) {
try {
/*
* Create a message instance, specifying topic, tag and message body.
* 3.创建一个消息实例,特定的topic、tag和消息体
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers
* 4.发送消息到其中的一个broker中
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
* 5.不再使用时关闭生产者实例
*/
producer.shutdown();
}
}
1.DefaultMQProducer 创建producer对象的构造函数
包含的信息:命名空间、生产者group、rpc钩子
/**
* Constructor specifying producer group.
*
* @param producerGroup Producer group, see the name-sake field.
*/
public DefaultMQProducer(final String producerGroup) {
//调用命名空间为null,传入生产者group,rpcHook为null
this(null, producerGroup, null);
}
/**
* Constructor specifying namespace, producer group and RPC hook.
* 构造函数:构造特定的命名空间、生产者组、rpc钩子
* @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
//默认MQ生产者实现,创建默认生产者实现
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
//构造函数 默认MQ生产者实现
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
//异步发送线程池队列使用的是链表阻塞队列
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
//默认异步发送Executor
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
//创建线程工厂:采用原子类创建线程索引,重写线程方法
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
2.启动生产者start
/**
* Start this producer instance. </p>
* 启动生产实例
*
* <strong> Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must
* to invoke this method before sending or querying messages. </strong> </p>
*
* @throws MQClientException if there is any unexpected error.
*/
@Override
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
//默认生产者实现启动、trace转发启动
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
//启动生产者
public void start() throws MQClientException {
this.start(true);
}
//mq启动:主要做的事:判断服务状态,如果是创建状态,则进行检查、同时进行更新
public void start(final boolean startFactory) throws MQClientException {
//对服务状态进行判断:CREATE_JUST,RUNNING,SHUTDOWN_ALREADY,START_FAILED;
//如果是创建时,则先设置状态,检查配置,同时对其进行修改实例名称到PID
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//检查配置 是否符合要求,如果符合,则改变实例化名称为进程id,也即pid
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//使用MQClientManager拿到实例MQClientInstance,整个jvm实例中只存在一个MQClientManager实例维护一个MQClientInstance缓存表
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//注册生产者
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//将主题发布信息放入
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//启动MQ 使用netty 重要
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
//否者是启动、启动失败、已经关闭状态,此时都会抛异常
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//发送心跳到所有的broker中
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//定时任务 扫描过期请求
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
getOrCreateMQClientInstance:
//获取或者创建MQClient实例
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
//构建ClientId
String clientId = clientConfig.buildMQClientId();
//拿到MQClient实例
MQClientInstance instance = this.factoryTable.get(clientId);
//如果不存在,则创建
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
//构建MQClientId,clientId为客户端IP+instance+(unitname可选)
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
//注册生产者
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
进行启动操作:
//进行启动 重点
//做的启动: 启动请求响应通道、启动定时任务、启动pull服务、启动rebalance服务、启动push服务
//否者抛异常
public void start() throws MQClientException {
synchronized (this) {
//根据服务状态进行判断
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
//如果没有指定,则从nameServer中寻找地址
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
//启动请求响应通道 关注
this.mQClientAPIImpl.start();
// Start various schedule tasks
//启动定时任务
this.startScheduledTask();
// Start pull service
//启动pull服务 关注
this.pullMessageService.start();
// Start rebalance service
//启动rebalance服务 关注
this.rebalanceService.start();
// Start push service
//启动push服务 关注
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
//将服务状态变成运行状态
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
启动操作this.mQClientAPIImpl.start():
//使用远程客户端启动服务
public void start() {
this.remotingClient.start();
}
//reomotingCelint,使用Netty,可以看到DefaultEventExecutorGroup继承MultithreadEventExecutorGroup
//而在Netty中,我们知道MultithreadEventExecutorGroup的构造方法是NioEventLoopGroup的构造方法
//构造方法:DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory)
@Override
public void start() {
//创建NioEventLoopGroup
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
//重写线程方法
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
//创建引导 客户端 填充信息
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
//重写initChannel方法
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
//pipeline添加信息,以及handler
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
//扫描响应表启动 定时任务
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
//如果通道事件监听不为空,则启动
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
启动定时任务:
//启动定时任务 进行心跳包发送操作
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//如果没有指定,则从nameServer中寻找地址
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
启动pull操作:
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
启动reblance操作:
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
启动push操作:
//mq启动:主要做的事:判断服务状态,如果是创建状态,则进行检查、同时进行更新、
public void start(final boolean startFactory) throws MQClientException {
//对服务状态进行判断:CREATE_JUST,RUNNING,SHUTDOWN_ALREADY,START_FAILED;
//如果是创建时,则先设置状态,检查配置,同时对其进行修改实例名称到PID
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//获取实例或者创建MQClient实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//向MQClientInstance注册生产者,如果不ok,则抛异常
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//将主题发布信息放入
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//启动MQ 使用netty
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
//否者是启动、启动失败、已经关闭状态,此时都会抛异常
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//发送心跳到所有的broker中
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//定时任务 扫描过期请求
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
同时还有一个分发的操作traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()):
/**
* Initialize asynchronous transfer data module
* 初始化异步传输数据
*/
void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
//启动相关异步traceDispatcher
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
//可以看到异步创建线程的方式
class AsyncRunnable implements Runnable {
private boolean stopped;
@Override
public void run() {
while (!stopped) {
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
//get trace data element from blocking Queue — traceContextQueue
//从阻塞队列中获取trace数据元素— traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}
}
3.启动完成之后,准备message
主题、tags、keys、flag、body、waitStoreMsgOK
public Message(String topic, String tags, byte[] body) {
this(topic, tags, "", 0, body, true);
}
//Message 信息:主题、tags、keys、flag、body、waitStoreMsgOK
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
this.topic = topic;
this.flag = flag;
this.body = body;
if (tags != null && tags.length() > 0)
this.setTags(tags);
if (keys != null && keys.length() > 0)
this.setKeys(keys);
this.setWaitStoreMsgOK(waitStoreMsgOK);
}
- 让XP支持4G内存
- Consul微服务的配置中心体验篇
- 如何使用Sentry实现Hive/Impala的数据脱敏
- 如何使用Oozie API接口向Kerberos环境的CDH集群提交Shell作业
- Docker下redis的主从、持久化配置
- vuejs、eggjs、mqtt全栈式开发设备管理系统
- Xss和Csrf介绍
- GraphQL介绍&使用nestjs构建GraphQL查询服务
- 使用auth_request模块实现nginx端鉴权控制
- Docker学习之Docker镜像基本使用
- Docker学习之Centos7下安装
- Impala的Short-Circuit Reads
- js各种继承方式汇总
- Cloudera Navigator异常分析
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 3 CPU缓存一致性协议MESi
- 4. synchronized详解
- 4.2 synchronized补充
- 解决 Maven 使用Tomcat 部署报错 Tomcat return http status error: 405, Reason Phrase: Method Not Allowed:
- spring5源码 -- IOC容器设计理念和核心注解的作用
- 1. spring5源码 -- Spring整体脉络 IOC加载过程 Bean的生命周期
- Cookie详解
- 2.1 Spring5源码--源码编译
- WebLogic 10 容器通过JNDI切换数据源
- 2.2 spring5源码 -- ioc加载的整体流程
- Js 删除 指定Domin 指定 path 下的 cookie中指定的内容
- 适配器模式与装饰器模式的区别
- java堆内存详解
- springBoot 入门(一)—— 使用idea创建第一个springBoot项目
- “dddb超级”工具包——高效、快速开发JavaWeb项目后端结构