Rocketmq源码解读之消息拉取

时间:2019-06-12
本文章向大家介绍Rocketmq源码解读之消息拉取,主要包括Rocketmq源码解读之消息拉取使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

最近阅读了Rocketmq关于pullmessage的实现方式,分享出来

众所周知,Rocketmq在consumer端是拉取消息的方式,它会在客户端维护一个PullRequestQueue,这个是一个阻塞队列(LinkedBlockingQueue),内部的节点是PullRequest,每一个PullRequest代表了一个消费的分组单元

PullRequest会记录一个topic对应的consumerGroup的拉取进度,包括MessageQueue和PorcessQueue,还有拉取的offset

(代码片段一)
public
class PullRequest { private String consumerGroup; private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; private boolean lockedFirst = false; }

其中MessageQueue记录元信息:

(代码片段二)
public
class MessageQueue implements Comparable<MessageQueue>, Serializable { private String topic; private String brokerName; private int queueId; }

PorcessQueue记录一次拉取之后实际消息体和拉取相关操作记录的快照

(代码片段三)
public
class ProcessQueue { private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); private final AtomicLong msgCount = new AtomicLong(); private final AtomicLong msgSize = new AtomicLong(); private final Lock lockConsume = new ReentrantLock(); private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>(); private final AtomicLong tryUnlockTimes = new AtomicLong(0); private volatile long queueOffsetMax = 0L; private volatile boolean dropped = false; private volatile long lastPullTimestamp = System.currentTimeMillis(); private volatile long lastConsumeTimestamp = System.currentTimeMillis(); private volatile boolean locked = false; private volatile long lastLockTimestamp = System.currentTimeMillis(); private volatile boolean consuming = false; private volatile long msgAccCnt = 0; }

PullMessageService负责轮询PullRequestQueue,并进行消息元的拉取

(代码片段四)
public
class PullMessageService extends ServiceThread { private final InternalLogger log = ClientLogger.getLog(); private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>(); private final MQClientInstance mQClientFactory; @Override public void run() { while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } } }

在发送的时候会维护一个PullCallback,这是拉取收到消息后的回调处理

(代码片段五)
public
interface PullCallback { void onSuccess(final PullResult pullResult); void onException(final Throwable e); }

这里的实现逻辑就不贴了,本质上就是把消息丢给消费线程池来处理

pullMessage分为同步拉取和异步拉取两种模式,先解读异步拉取,然后再解读同步拉取,再说明两者的区别

(代码片段六)
public
PullResult pullMessage( final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); switch (communicationMode) { case ONEWAY: assert false; return null; case ASYNC: this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); return null; case SYNC: return this.pullMessageSync(addr, request, timeoutMillis); default: assert false; break; } return null; }

先介绍异步拉取

可以看到,把PullCallback传进去,并封装了InvokeCallback,

(代码片段七)
private
void pullMessageAsync( final String addr, final RemotingCommand request, final long timeoutMillis, final PullCallback pullCallback ) throws RemotingException, InterruptedException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { try { PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response); assert pullResult != null; pullCallback.onSuccess(pullResult); } catch (Exception e) { pullCallback.onException(e); } } else { if (!responseFuture.isSendRequestOK()) { pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); } else if (responseFuture.isTimeout()) { pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause())); } else { pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); } } } }); }

接下来进入NettyRemotingAbstract这个类中,使用netty的Chanle发送

(代码片段八)
public
void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseFuture.putResponse(null); responseTable.remove(opaque); try { executeInvokeCallback(responseFuture); } catch (Throwable e) { log.warn("excute callback in writeAndFlush addListener, and callback throw", e); } finally { responseFuture.release(); } log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }

这里需要详细解读一下:

RemotingCommand是request和response的载体,先从request中取出opaque,这是一个自增的操作id,然后将传进来的opaque和invokeCallback封装成一个ResponseFuture,再put到一个叫

responseTable的map中,这个map是一个核心的map,维护着opaque和对应的ResponseFuture

    /**
     * This map caches all on-going requests.
     */
    protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256);

从注释中可以看出,它缓存着正在执行的request

再回到刚刚的(代码片段八)中,channel.writeAndFlush(request).addListener(new ChannelFutureListener(){...}),netty在writeAndFlush发送完之后会回调我们ChannelFutureListener的operationComplete方法:如果发送成功则responseFuture.setSendRequestOK(true); 并且就return了;如果发送失败,则从responseTable中移除,并且起一个异步线程执行responseFuture中的InvokeCallback,在(代码片段七)中,可以看到当responseFuture.isSendRequestOK()是false的时候,执行了onException,这里就不多介绍了。

那么此时发送的逻辑就全部结束了,整个过程没有任何的阻塞,当Broker收到拉取请求后,会按照queueOffset等信息封装好返回consumer端,

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);

            if (responseFuture.getInvokeCallback() != null) {
                executeInvokeCallback(responseFuture);
            } else {
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

原文地址:https://www.cnblogs.com/cishengchongyan/p/11009775.html