20、RocketMQ源码分析:DefaultMQPushConsumer处理Broker的拉取消息响应源码
此前我们学习了Consumer如何发起的拉取消息请求
,以及Broker如何处理拉取消息请求
,现在我们来学习Consumer如何处理Broker的拉取消息响应的源码。
文章目录
- 1 客户端异步请求回调
- 1.1.1.1. processPullResponse解析响应
- 2 PullCallback回调
- 2.1 processPullResult处理拉取结果
- 2.2 executePullRequestImmediately再次拉取消息
- 2.3 putMessage存放消息
- 2.4 消息的两次过滤
- 3 总结
1 客户端异步请求回调
此前我们讲过在consumer发起拉取消息请求
的时候,通过ASYNC模式异步的进行拉取,并且InvokeCallback#operationComplete方法将会在得到结果之后进行回调,内部调用*
*pullCallback**的回调方法。
在回调方法中,如果解析到了响应结果,那么调用pullCallback#onSuccess方法处理,否则调用pullCallback#onException方法处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
|
private void pullMessageAsync( final String addr, final RemotingCommand request, final long timeoutMillis, final PullCallback pullCallback ) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { try { PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr); assert pullResult != null; pullCallback.onSuccess(pullResult); } catch (Exception e) { pullCallback.onException(e); } } else { if (!responseFuture.isSendRequestOK()) { pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); } else if (responseFuture.isTimeout()) { pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause())); } else { pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); } } } }); }
|
1.1.1.1. processPullResponse解析响应
该方法处理response获取PullResult,根据响应的数据创建PullResultExt对象返回,注意此时拉取到的消息还是一个字节数组。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
private PullResult processPullResponse( final RemotingCommand response, final String addr) throws MQBrokerException, RemotingCommandException { PullStatus pullStatus = PullStatus.NO_NEW_MSG; switch (response.getCode()) { case ResponseCode.SUCCESS: pullStatus = PullStatus.FOUND; break; case ResponseCode.PULL_NOT_FOUND: pullStatus = PullStatus.NO_NEW_MSG; break; case ResponseCode.PULL_RETRY_IMMEDIATELY: pullStatus = PullStatus.NO_MATCHED_MSG; break; case ResponseCode.PULL_OFFSET_MOVED: pullStatus = PullStatus.OFFSET_ILLEGAL; break;
default: throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); }
|
2 PullCallback回调
在processPullResponse处理response之后,会调用此前DefaultMQPushConsumerImpl#pullMessage方法中创建的PullCallback消息拉取的回调函数,执行onSuccess回调方法。如果解析过程中抛出异常,则调用onException方法。
onSuccess回调方法的大概逻辑为:
1、 调用processPullResult方法处理pullResult,进行消息解码、过滤以及设置其他属性的操作,返回pullResult;
2、 如果没有拉取到消息
,那么设置下一次拉取的起始offset到PullRequest中,调用executePullRequestImmediately方法立即将拉取请求再次放入PullMessageService的pullRequestQueue中,PullMessageService是一个线程服务,PullMessageService将会循环的获取pullRequestQueue中的pullRequest然后向broker发起新的拉取消息请求,
进行下次消息的拉取;
3、 如果拉取到了消息,将拉取到的所有消息,存入对应的processQueue处理队列内部的msgTreeMap中,等待被异步的消费;
4、 通过consumeMessageService将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor
线程池消费消息,consumeMessageService有**ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费
**两种实现;
5、
获取配置的消息拉取间隔,默认为0,如果大于0则调用executePullRequestLater方法,等待间隔时间后将拉取请求再次放入pullRequestQueue中,否则立即调用executePullRequestImmediately放入pullRequestQueue中,进行下次消息的拉取;
如果是onException方法,那么延迟3s将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
| PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } }
if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); }
break; case NO_NEW_MSG: case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } }
@Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); }
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } };
|
2.1 processPullResult处理拉取结果
处理pullResult,进行消息解码、过滤以及设置其他属性的操作。
1、 更新下次拉取建议的brokerId,下次拉取消息时从pullFromWhichNodeTable中直接取出;
2、 对消息二进制字节数组进行解码转换为java的List消息集合;
- 如果存在tag,并且不是classFilterMode,那么按照tag过滤消息,这就是客户端的消息过滤。这采用String#equals方法过滤,而broker端则是比较的tagHash值,即hashCode。
4、 如果有消息过滤钩子,那么执行钩子方法,这里可以扩展自定义的消息过滤的逻辑;
5、 遍历过滤通过的消息,设置属性例如事务id,最大、最小偏移量、brokerName;
6、 将过滤后的消息存入msgFoundList集合;
7、 因为消息已经被解析了,那么设置消息的字节数组为null,释放内存;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } }
if (this.hasHook()) { FilterMessageContext filterMessageContext = new FilterMessageContext(); filterMessageContext.setUnitMode(unitMode); filterMessageContext.setMsgList(msgListFilterAgain); this.executeHook(filterMessageContext); }
for (MessageExt msg : msgListFilterAgain) { String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(traFlag)) { msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); } MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, Long.toString(pullResult.getMinOffset())); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, Long.toString(pullResult.getMaxOffset())); msg.setBrokerName(mq.getBrokerName()); } pullResultExt.setMsgFoundList(msgListFilterAgain); } pullResultExt.setMessageBinary(null);
return pullResult; }
public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) { AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); if (null == suggest) { this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId)); } else { suggest.set(brokerId); } }
|
将拉取请求再次放入PullMessageService的pullRequestQueue中,PullMessageService是一个线程服务。PullMessageService将会循环的获取pullRequestQueue中的pullRequest然后向broker发起新的拉取消息请求,进行下次消息的拉取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
public void executePullRequestImmediately(final PullRequest pullRequest) { this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); }
public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
|
2.3 putMessage存放消息
该方法将拉取到的所有消息,存入对应的processQueue处理队列内部的msgTreeMap中。
返回是否需要分发消费dispatchToConsume,当当前processQueue的内部的msgTreeMap中有消息并且consuming=false,即还没有开始消费时,将会返回true。
dispatchToConsume对并发消费无影响,只对顺序消费有影响。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
public boolean putMessage(final List<MessageExt> msgs) { boolean dispatchToConsume = false; try { this.treeMapLock.writeLock().lockInterruptibly(); try { int validMsgCnt = 0; for (MessageExt msg : msgs) { MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg); if (null == old) { validMsgCnt++; this.queueOffsetMax = msg.getQueueOffset(); msgSize.addAndGet(msg.getBody().length); } } msgCount.addAndGet(validMsgCnt); if (!msgTreeMap.isEmpty() && !this.consuming) { dispatchToConsume = true; this.consuming = true; } if (!msgs.isEmpty()) { MessageExt messageExt = msgs.get(msgs.size() - 1); String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET); if (property != null) { long accTotal = Long.parseLong(property) - messageExt.getQueueOffset(); if (accTotal > 0) { this.msgAccCnt = accTotal; } } } } finally { this.treeMapLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("putMessage exception", e); }
return dispatchToConsume; }
|
2.4 消息的两次过滤
- *通过前面的源码可以看到,消息实际上经过了两次过滤,一次是在broekr中,一次是拉取到consumer之后,为什么经过两次过滤呢?因为broker中的过滤是比较的hashCode值,而hashCode存在哈希碰撞的可能,因此hashCode对比相等之后,还需要在consumer端进行equals的比较,再过滤一次。
**
为*
*什么服务端不直接进行equals过滤呢?因为tag的长度是不固定的,而通过hash算法可以生成固定长度的hashCode值,这样才能保证每个consumequeue索引条目的长度一致。而tag的真正值保存在commitLog的消息体中,虽然broker最终会回去到commitLog中的消息并返回,但是获取的获取一段消息字节数组,并没有进行反序列化为Message对象,因此无法获取真实值,而在consumer端一定会做反序列化操作的,因此tag的equals比较放在了consumer端。
**
3 总结
本次我们来学习Consumer如何处理Broker的拉取消息响应的源码。入口就是MQClientAPIImpl#pullMessageAsync方法内部的回调函数InvokeCallback#operationComplete方法。
1、 在这个方法中,首先进行消息的解码以及第二次过滤,然后将消息存入对应的processQueue处理队列内部的msgTreeMap中;
- 然后通过consumeMessageService#submitConsumeRequest方法将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池的消费消息。
1、 consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现;
3、 最后是再次发起消息拉取请求;
**下一篇文章,我们将会去看看ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现如何消费消息。
**
__END__