RocketMQ学习四-生产者producer

时间:2022-07-22
本文章向大家介绍RocketMQ学习四-生产者producer,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

前面我们已经知道RocketMQ的生产者和消费者依赖NameServer和broker,因此需要先启动nameServer和broker。同时nameServer首先会解析并填充nameServerConfig、NettyServerConfig的属性信息。然后实例化namesrvController,加载kv配置,开启两个定时任务。同时nameServer中存放了路由的基础信息,同时能够管理broker节点(路由的注册和删除、发现)。

路由注册是通过broker和nameServer的心跳功能实现的。broker每个30秒向集群中所有的nameServer发送心跳包,nameServer收到broker发送的心跳包时会更新brokerLiveTable缓存中BrokerLiveTable中的lastUpdateTimeStamp,然后nameServer每隔10s扫描brokerLiveTable,如果连续120s没有收到心跳包,nameServer将移除该broker的路由信息同时关闭socket连接。

RocketMQ的生产者:

/**
 * This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}.
 * 生产者启动类
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         * 1.创建生产者对象,采用DefaultMQProducer创建
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        /*
         * Launch the instance.
         */
        producer.setNamesrvAddr("127.0.0.1:9876");
        //2.启动生产者
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 * 3.创建一个消息实例,特定的topic、tag和消息体
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers
                 * 4.发送消息到其中的一个broker中
                 */
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * Shut down once the producer instance is not longer in use.
         * 5.不再使用时关闭生产者实例
         */
        producer.shutdown();
    }
}

1.DefaultMQProducer 创建producer对象的构造函数

包含的信息:命名空间、生产者group、rpc钩子

/**
 * Constructor specifying producer group.
 *
 * @param producerGroup Producer group, see the name-sake field.
 */
public DefaultMQProducer(final String producerGroup) {
    //调用命名空间为null,传入生产者group,rpcHook为null
    this(null, producerGroup, null);
}

   /**
     * Constructor specifying namespace, producer group and RPC hook.
     * 构造函数:构造特定的命名空间、生产者组、rpc钩子
     * @param namespace Namespace for this MQ Producer instance.
     * @param producerGroup Producer group, see the name-sake field.
     * @param rpcHook RPC hook to execute per each remoting command execution.
     */
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    //默认MQ生产者实现,创建默认生产者实现
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}


//构造函数 默认MQ生产者实现
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    this.defaultMQProducer = defaultMQProducer;
    this.rpcHook = rpcHook;
    //异步发送线程池队列使用的是链表阻塞队列
    this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
    //默认异步发送Executor
    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),
        Runtime.getRuntime().availableProcessors(),
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.asyncSenderThreadPoolQueue,
        //创建线程工厂:采用原子类创建线程索引,重写线程方法
        new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
} 

2.启动生产者start

/**
 * Start this producer instance. </p>
 *  启动生产实例
 *
 * <strong> Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must
 * to invoke this method before sending or querying messages. </strong> </p>
 *
 * @throws MQClientException if there is any unexpected error.
 */
@Override
public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    //默认生产者实现启动、trace转发启动
    this.defaultMQProducerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

//启动生产者
public void start() throws MQClientException {
        this.start(true);
}

//mq启动:主要做的事:判断服务状态,如果是创建状态,则进行检查、同时进行更新
public void start(final boolean startFactory) throws MQClientException {
    //对服务状态进行判断:CREATE_JUST,RUNNING,SHUTDOWN_ALREADY,START_FAILED;
    //如果是创建时,则先设置状态,检查配置,同时对其进行修改实例名称到PID
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
           //检查配置 是否符合要求,如果符合,则改变实例化名称为进程id,也即pid
            this.checkConfig();

            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            //使用MQClientManager拿到实例MQClientInstance,整个jvm实例中只存在一个MQClientManager实例维护一个MQClientInstance缓存表
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            //注册生产者
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                            null);
            }
            //将主题发布信息放入
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            //启动MQ  使用netty 重要
            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                     this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
            //否者是启动、启动失败、已经关闭状态,此时都会抛异常
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                                        + this.serviceState
                                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                        null);
        default:
            break;
    }

    //发送心跳到所有的broker中
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    //定时任务 扫描过期请求
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                RequestFutureTable.scanExpiredRequest();
            } catch (Throwable e) {
                log.error("scan RequestFutureTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

getOrCreateMQClientInstance:

//获取或者创建MQClient实例
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    //构建ClientId
    String clientId = clientConfig.buildMQClientId();
    //拿到MQClient实例
    MQClientInstance instance = this.factoryTable.get(clientId);
    //如果不存在,则创建
    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

//构建MQClientId,clientId为客户端IP+instance+(unitname可选)
public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());

        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }

        return sb.toString();
 }


//注册生产者
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
    if (null == group || null == producer) {
        return false;
    }

    MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
    if (prev != null) {
        log.warn("the producer group[{}] exist already.", group);
        return false;
    }

    return true;
}

进行启动操作:

//进行启动  重点
//做的启动: 启动请求响应通道、启动定时任务、启动pull服务、启动rebalance服务、启动push服务
//否者抛异常
public void start() throws MQClientException {

    synchronized (this) {
        //根据服务状态进行判断
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                //如果没有指定,则从nameServer中寻找地址
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                //启动请求响应通道  关注
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                //启动定时任务
                this.startScheduledTask();
                // Start pull service
                //启动pull服务 关注
                this.pullMessageService.start();
                // Start rebalance service
                //启动rebalance服务 关注
                this.rebalanceService.start();
                // Start push service
                //启动push服务 关注
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                //将服务状态变成运行状态
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

启动操作this.mQClientAPIImpl.start():

//使用远程客户端启动服务
public void start() {
    this.remotingClient.start();
}

//reomotingCelint,使用Netty,可以看到DefaultEventExecutorGroup继承MultithreadEventExecutorGroup
//而在Netty中,我们知道MultithreadEventExecutorGroup的构造方法是NioEventLoopGroup的构造方法
//构造方法:DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory)
@Override
public void start() {
    //创建NioEventLoopGroup
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyClientConfig.getClientWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);
            //重写线程方法
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });

    //创建引导 客户端 填充信息
    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
        .handler(new ChannelInitializer<SocketChannel>() {
            //重写initChannel方法
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                if (nettyClientConfig.isUseTLS()) {
                    if (null != sslContext) {
                        pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                        log.info("Prepend SSL handler");
                    } else {
                        log.warn("Connections are insecure as SSLContext is null!");
                    }
                }
                //pipeline添加信息,以及handler
                pipeline.addLast(
                    defaultEventExecutorGroup,
                    new NettyEncoder(),
                    new NettyDecoder(),
                    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                    new NettyConnectManageHandler(),
                    new NettyClientHandler());
            }
        });

    //扫描响应表启动 定时任务
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                NettyRemotingClient.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);

    //如果通道事件监听不为空,则启动
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
}

启动定时任务:

//启动定时任务 进行心跳包发送操作
private void startScheduledTask() {
    if (null == this.clientConfig.getNamesrvAddr()) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    //如果没有指定,则从nameServer中寻找地址
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}

启动pull操作:

public void start() {
    log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
    if (!started.compareAndSet(false, true)) {
        return;
    }
    stopped = false;
    this.thread = new Thread(this, getServiceName());
    this.thread.setDaemon(isDaemon);
    this.thread.start();
}

启动reblance操作:

public void start() {
    log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
    if (!started.compareAndSet(false, true)) {
        return;
    }
    stopped = false;
    this.thread = new Thread(this, getServiceName());
    this.thread.setDaemon(isDaemon);
    this.thread.start();
}

启动push操作:

//mq启动:主要做的事:判断服务状态,如果是创建状态,则进行检查、同时进行更新、
public void start(final boolean startFactory) throws MQClientException {
    //对服务状态进行判断:CREATE_JUST,RUNNING,SHUTDOWN_ALREADY,START_FAILED;
    //如果是创建时,则先设置状态,检查配置,同时对其进行修改实例名称到PID
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
           //获取实例或者创建MQClient实例
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            //向MQClientInstance注册生产者,如果不ok,则抛异常
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }
            //将主题发布信息放入
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            //启动MQ  使用netty
            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        //否者是启动、启动失败、已经关闭状态,此时都会抛异常
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    //发送心跳到所有的broker中
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    //定时任务 扫描过期请求
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                RequestFutureTable.scanExpiredRequest();
            } catch (Throwable e) {
                log.error("scan RequestFutureTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

同时还有一个分发的操作traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()):

/**
 * Initialize asynchronous transfer data module
 * 初始化异步传输数据
 */
void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;

//启动相关异步traceDispatcher
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
    if (isStarted.compareAndSet(false, true)) {
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.accessChannel = accessChannel;
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
    this.worker.setDaemon(true);
    this.worker.start();
    this.registerShutDownHook();
}

//可以看到异步创建线程的方式
 class AsyncRunnable implements Runnable {
        private boolean stopped;

        @Override
        public void run() {
            while (!stopped) {
                List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
                for (int i = 0; i < batchSize; i++) {
                    TraceContext context = null;
                    try {
                        //get trace data element from blocking Queue — traceContextQueue
                        //从阻塞队列中获取trace数据元素— traceContextQueue
                        context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                    }
                    if (context != null) {
                        contexts.add(context);
                    } else {
                        break;
                    }
                }
                if (contexts.size() > 0) {
                    AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
                    traceExecutor.submit(request);
                } else if (AsyncTraceDispatcher.this.stopped) {
                    this.stopped = true;
                }
            }

        }
    }

3.启动完成之后,准备message

主题、tags、keys、flag、body、waitStoreMsgOK

public Message(String topic, String tags, byte[] body) {
    this(topic, tags, "", 0, body, true);
}

 //Message 信息:主题、tags、keys、flag、body、waitStoreMsgOK
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
    this.topic = topic;
    this.flag = flag;
    this.body = body;

    if (tags != null && tags.length() > 0)
        this.setTags(tags);

    if (keys != null && keys.length() > 0)
        this.setKeys(keys);

    this.setWaitStoreMsgOK(waitStoreMsgOK);
}