19、RocketMQ源码分析:Broker处理DefaultMQPushConsumer发起的拉取消息请求源码【一万字】 前我们学习了RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码 。我们知道consumer在发送了拉取消息请求的时候,请求的Code为 PULL_MESSAGE ,broker端接收到请求之后,将会更根据不同的Code调用不同的处理器进行处理,而PULL_MESSAGE 拉取消息的请求则是通过PullMessageProcessor 处理的。
下面我们来看看Broker处理DefaultMQPushConsumer发起的拉取消息请求源码。
1 PullMessageProcessor处理拉取请求 PullMessageProcessor作为broker的拉取消息处理器,用于处理拉取消息的请求,他在BrokerController实例化的时候跟着实例化。 然后在registerProcessor方法中,将Code为PULL_MESSAGE的请求与其绑定。
随后在processRequestCommand方法中,会根据请求的Code选择不同的netty处理器进行处理,调用方法为asyncProcessRequest:
在asyncProcessRequest方法中,将会调用processRequest方法,该方法由各个处理器实现,这里就是PullMessageProcessor处理器处理器PULL_MESSAGE拉取消息请求的入口方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public RemotingCommand processRequest (final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { return this .processRequest(ctx.channel(), request, true ); }
2 processRequest处理拉取消息请求 ** 该方法处理拉取消息的请求,包括构建过滤信息,拉取消息,拉取结果处理(判断直接返回响应还是挂起请求),上报消费点位等步骤,源码非常多,不易理解。** 大概步骤为(详细步骤请看源码注释):
1、 构建过滤信息基于TAG会获取subscriptionData,基于classFilter还会获取consumerFilterData,最后根据它们构建MessageFilter对象;
1、 如果有子订阅标记,即hasSubscriptionFlag=true,那么每次拉取消息都会重新构建subscriptionData和consumerFilterData,而不是使用缓存的数据;2、 一般hasSubscriptionFlag都是false,因为hasSubscriptionFlag为true需要consumer端将postSubscriptionWhenPull=true,并且订阅不是classFilter模式同时满足;
通过DefaultMessageStore#getMessage方法拉取消息,并且进行过滤操作。这是拉取消息的核心方法,涉及到查找ConsumeQueue和CommitLog文件数据。返回拉取结果GetMessageResult。
3、 对于拉取结果GetMessageResult进行处理,设置响应数据;
1、 判断并设置下次拉取消息的建议broker是MASTER还是SLAVE;
1 2 3 1. 如果getMessage方法返回的GetMessageResult的suggestPullingFromSlave属性为true,则设置responseHeader的suggestWhichBrokerId属性值为1,即建议下次从SLAVE拉取,否则设置为0,建议下次从MASTER拉取。 2. 判断broker角色。如果是SLAVE,并且slaveReadEnable = false。那么设置responseHeader的suggestWhichBrokerId属性值为0,即建议下次从MASTER拉取。 3. 如果slaveReadEnable = true,并且如果如果消费太慢了,那么下次重定向到另一台broker,id通过subscriptionGroupConfig的whichBrokerWhenConsumeSlowly指定,默认1,即SLAVE。否则id通过subscriptionGroupConfig的brokerId指定,默认0,即MASTER。如果slaveReadEnable = false,设置建议的brokerId为MASTER。
2、 判断getMessageResult的状态码,并设置response的对应的响应码;3、 判断如果有消费钩子,那么执行消费钩子的consumeMessageBefore方法;4、 判断响应码,然后直接返回数据或者进行短轮询或者长轮询;
1 2 3 4 5 6 1. 如果拉取消息成功,那么更新一些统计信息,然后从buffer中读取出消息转换为字节数组,存入response的body中。 2. 如果没有读取到消息,如果broker允许挂起请求并且客户端支持请求挂起,则broker挂起该请求一段时间,中间如果有消息到达则会唤醒请求拉取消息并返回。 1. 计算最长挂起时间,如果支持长轮询则默认最长挂起15s,否则使用短轮询,挂起最长1s。 2. 构建一个PullRequest,通过pullRequestHoldService\#suspendPullRequest方法提交PullRequest,该请求将会被挂起并异步处理。 iii. 如果读取的offset不正确,太大或者太小,发布offset移除事件。
拉取消息完毕之后,无论是否拉取到消息,只要broker支持挂起请求(新的拉取请求为true,但是已被suspend的请求将会是false),并且consumer支持提交消费进度,并且当前broker不是SLAVE角色,那么通过ConsumerOffsetManager#commitOffset方法提交消费进度偏移量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 private RemotingCommand processRequest (final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { final long beginTimeMills = this .brokerController.getMessageStore().now(); RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); response.setOpaque(request.getOpaque()); log.debug("receive PullMessage request command, {}" , request); if (!PermName.isReadable(this .brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format("the broker[%s] pulling message is forbidden" , this .brokerController.getBrokerConfig().getBrokerIP1())); return response; } SubscriptionGroupConfig subscriptionGroupConfig = this .brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark(String.format("subscription group [%s] does not exist, %s" , requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST))); return response; } if (!subscriptionGroupConfig.isConsumeEnable()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup()); return response; } final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag()); final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag()); final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag()); final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0 ; TopicConfig topicConfig = this .brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (null == topicConfig) { log.error("the topic {} not exist, consumer: {}" , requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel)); response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark(String.format("topic[%s] not exist, apply first please! %s" , requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL))); return response; } if (!PermName.isReadable(topicConfig.getPerm())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden" ); return response; } if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]" , requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); log.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorInfo); return response; } SubscriptionData subscriptionData = null ; ConsumerFilterData consumerFilterData = null ; if (hasSubscriptionFlag) { try { subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()); if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), requestHeader.getExpressionType(), requestHeader.getSubVersion()); assert consumerFilterData != null ; } } catch (Exception e) { log.warn("Parse the consumer's subscription[{}] failed, group: {}" , requestHeader.getSubscription(), requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setRemark("parse the consumer's subscription failed" ); return response; } } else { ConsumerGroupInfo consumerGroupInfo = this .brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); if (null == consumerGroupInfo) { log.warn("the consumer's group info not exist, group: {}" , requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; } if (!subscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way" ); return response; } subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); if (null == subscriptionData) { log.warn("the consumer's subscription not exist, group: {}, topic:{}" , requestHeader.getConsumerGroup(), requestHeader.getTopic()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; } if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) { log.warn("The broker's subscription is not latest, group: {} {}" , requestHeader.getConsumerGroup(), subscriptionData.getSubString()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST); response.setRemark("the consumer's subscription not latest" ); return response; } if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { consumerFilterData = this .brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), requestHeader.getConsumerGroup()); if (consumerFilterData == null ) { response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST); response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!" ); return response; } if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) { log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}" , requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion()); response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST); response.setRemark("the consumer's consumer filter data not latest" ); return response; } } } if (!ExpressionType.isTagType(subscriptionData.getExpressionType()) && !this .brokerController.getBrokerConfig().isEnablePropertyFilter()) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType()); return response; } MessageFilter messageFilter; if (this .brokerController.getBrokerConfig().isFilterSupportRetry()) { messageFilter = new ExpressionForRetryMessageFilter (subscriptionData, consumerFilterData, this .brokerController.getConsumerFilterManager()); } else { messageFilter = new ExpressionMessageFilter (subscriptionData, consumerFilterData, this .brokerController.getConsumerFilterManager()); } final GetMessageResult getMessageResult = this .brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); if (getMessageResult != null ) { response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } switch (this .brokerController.getMessageStoreConfig().getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: break ; case SLAVE: if (!this .brokerController.getBrokerConfig().isSlaveReadEnable()) { response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } break ; } if (this .brokerController.getBrokerConfig().isSlaveReadEnable()) { if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); } } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } switch (getMessageResult.getStatus()) { case FOUND: response.setCode(ResponseCode.SUCCESS); break ; case MESSAGE_WAS_REMOVING: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); break ; case NO_MATCHED_LOGIC_QUEUE: case NO_MESSAGE_IN_QUEUE: if (0 != requestHeader.getQueueOffset()) { response.setCode(ResponseCode.PULL_OFFSET_MOVED); log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}" , requestHeader.getQueueOffset(), getMessageResult.getNextBeginOffset(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup()); } else { response.setCode(ResponseCode.PULL_NOT_FOUND); } break ; case NO_MATCHED_MESSAGE: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); break ; case OFFSET_FOUND_NULL: response.setCode(ResponseCode.PULL_NOT_FOUND); break ; case OFFSET_OVERFLOW_BADLY: response.setCode(ResponseCode.PULL_OFFSET_MOVED); log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}" , requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress()); break ; case OFFSET_OVERFLOW_ONE: response.setCode(ResponseCode.PULL_NOT_FOUND); break ; case OFFSET_TOO_SMALL: response.setCode(ResponseCode.PULL_OFFSET_MOVED); log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}" , requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), getMessageResult.getMinOffset(), channel.remoteAddress()); break ; default : assert false ; break ; } if (this .hasConsumeMessageHook()) { ConsumeMessageContext context = new ConsumeMessageContext (); context.setConsumerGroup(requestHeader.getConsumerGroup()); context.setTopic(requestHeader.getTopic()); context.setQueueId(requestHeader.getQueueId()); String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); switch (response.getCode()) { case ResponseCode.SUCCESS: int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount; context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS); context.setCommercialRcvTimes(incValue); context.setCommercialRcvSize(getMessageResult.getBufferTotalSize()); context.setCommercialOwner(owner); break ; case ResponseCode.PULL_NOT_FOUND: if (!brokerAllowSuspend) { context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS); context.setCommercialRcvTimes(1 ); context.setCommercialOwner(owner); } break ; case ResponseCode.PULL_RETRY_IMMEDIATELY: case ResponseCode.PULL_OFFSET_MOVED: context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS); context.setCommercialRcvTimes(1 ); context.setCommercialOwner(owner); break ; default : assert false ; break ; } this .executeConsumeMessageHookBefore(context); } switch (response.getCode()) { case ResponseCode.SUCCESS: this .brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), getMessageResult.getMessageCount()); this .brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(), getMessageResult.getBufferTotalSize()); this .brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount()); if (this .brokerController.getBrokerConfig().isTransferMsgByHeap()) { final byte [] r = this .readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); this .brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), (int ) (this .brokerController.getMessageStore().now() - beginTimeMills)); response.setBody(r); } else { try { FileRegion fileRegion = new ManyMessageTransfer (response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult); channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { getMessageResult.release(); if (!future.isSuccess()) { log.error("transfer many message by pagecache failed, {}" , channel.remoteAddress(), future.cause()); } } }); } catch (Throwable e) { log.error("transfer many message by pagecache exception" , e); getMessageResult.release(); } response = null ; } break ; case ResponseCode.PULL_NOT_FOUND: if (brokerAllowSuspend && hasSuspendFlag) { long pollingTimeMills = suspendTimeoutMillisLong; if (!this .brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this .brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest (request, channel, pollingTimeMills, this .brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); this .brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null ; break ; } case ResponseCode.PULL_RETRY_IMMEDIATELY: break ; case ResponseCode.PULL_OFFSET_MOVED: if (this .brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this .brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { MessageQueue mq = new MessageQueue (); mq.setTopic(requestHeader.getTopic()); mq.setQueueId(requestHeader.getQueueId()); mq.setBrokerName(this .brokerController.getBrokerConfig().getBrokerName()); OffsetMovedEvent event = new OffsetMovedEvent (); event.setConsumerGroup(requestHeader.getConsumerGroup()); event.setMessageQueue(mq); event.setOffsetRequest(requestHeader.getQueueOffset()); event.setOffsetNew(getMessageResult.getNextBeginOffset()); this .generateOffsetMovedEvent(event); log.warn("PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}" , requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), responseHeader.getSuggestWhichBrokerId()); } else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}" , requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), responseHeader.getSuggestWhichBrokerId()); } break ; default : assert false ; } } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store getMessage return null" ); } boolean storeOffsetEnable = brokerAllowSuspend; storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable && this .brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; if (storeOffsetEnable) { this .brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); } return response; }
2.1 getMessage批量获取消息 **该方法从给定偏移量开始,在queueId中最多查询属于topic的最多maxMsgNums条消息,默认32条。获取的消息将使用提供的消息过滤器messageFilter进行进一步筛选。 **
*最终返回GetMessageResult对象,内部包含着拉取到的消息的序列化之后的原始字节buffer数组,而不是反序列化后的Message对象。Consumer收到响应消息之后,会按照CommitLog消息序列化格式自行反序列化为Message对象。这里在broker端没有进行反序列化是因为没有必要,因为在返回响应的数据的时候,还是需要进行序列化传输,这样提升了效率。 **
**这个方法是broker处理拉取消息的核心方法,真正的拉取消息。大概步骤如下,该方法流程很长,详细的步骤建议看看下面的源码中的注释: **
1、 初始化拉取时间beginTime,拉取状态status,下次拉取的consumeQueue的起始逻辑偏移量nextBeginOffset,当前commitLog的最大物理偏移量maxOffsetPy等变量,后面会用到;2、 调用findConsumeQueue方法,根据topic和队列id确定需要写入的ConsumeQueue这个方法的源码我们在构建ConsumeQueue和IndexFIle部分已经讲过了;3、 偏移量校验首先获取consumeQueue的最小和最大的逻辑偏移量minOffset和maxOffset对于偏移量异常的情况,需要矫正下一次拉取的开始偏移量:;
1、 如果最大的逻辑偏移量offset为0,表示消息队列无消息,设置NO_MESSAGE_IN_QUEUE矫正下一次拉取的开始偏移量,如果broker不是SLAVE节点,或者是SLAVE节点但是从服务器支持offset检查,则为0;2、 如果consumer传递的offset小于最小偏移量,表示拉取的位置太小,设置OFFSET_TOO_SMALL矫正下一次拉取的开始偏移量,如果broker不是SLAVE节点,或者是SLAVE节点但是从服务器支持offset检查,则为minOffset;3、 如果consumer传递的offset等于最大偏移量,表示拉取的位置溢出,设置OFFSET_OVERFLOW_ONE矫正下一次拉取的开始偏移量,还是offset;4、 如果consumer传递的offset大于最大偏移量,表示拉取的位置严重溢出,设置OFFSET_OVERFLOW_BADLY如果最小偏移量为0,矫正下一次拉取的开始偏移量,如果broker不是SLAVE节点,或者是SLAVE节点但是从服务器支持offset检查,则为minOffset如果最小偏移量不为0,则为maxOffset;5、 以上情况均导致不会进行消息拉取 ;4、 如果consumer传递的offset大于等于minOffset,且小于maxOffset,表示偏移量在正常范围内则进行下一步消息拉取操作;
调用consumeQueue#getIndexBuffer方法。根据逻辑offset定位到物理偏移量,然后截取该偏移量之后的一段Buffer,其包含要拉取的消息的索引数据及对应consumeQueue文件之后的全部索引数据。这里截取的Buffer可能包含多条索引数据,因为需要批量拉取多条消息,以及进行消息过滤。
6、 如果截取到了Buffer数据,那么从Buffer中检查索引数据以及查找commitLog中的消息数据否则,表示可能是到达了当前consumeQueue文件的尾部,nextBeginOffset设置为consumeQueue的下一个文件的起始偏移量,本次不进行拉取;
1、 正式拉取消息前,同样先初始化一些变量,例如下一个commitLog文件的起始物理偏移量nextPhyFileStartOffset,默认从Long.MIN_VALUE开始,以及本次消息拉取的最大物理偏移量maxPhyOffsetPulling;2、 计算每次拉取的最大的过滤消息索引字节数,一般为16000/20=800条即一次拉取请求,最多查找800条消息索引;3、 循环遍历截取的buffer,处理每一条ConsumeQueue索引,拉取消息ConsumeQueue消息固定长度20字节,因此每次循环移动20B的长度;
1 2 3 4 5 6 7 8 9 10 1. 获取ConsumeQueue索引条目中的数据,包括消息在CommitLog中的物理偏移量offsetPy,消息大小sizePy,生产者发送消息时设置的tags的hashCode值tagsCode。 2. 更新maxPhyOffsetPulling为当前消息在CommitLog中的物理偏移量。 3. 如果nextPhyFileStartOffset不为Long.MIN\_VALUE,并且offsetPy 小于 nextPhyFileStartOffset那表示切换到了下一个commitLog文件,并且当前偏移量下一该文件最小偏量,那么跳过该消息的处理。 4. 调用checkInDiskByCommitOffset方法。检查要拉取的消息是否在磁盘上。 5. 调用isTheBatchFull方法,判断本次请求的消息拉取是否达到上限,如果达到上限,则跳出循环,结束消息的拉取。 6. **调用messageFilter\#isMatchedByConsumeQueue方法,执行消息tagsCode过滤**。tagsCode在ConsumeQueue中保存着,因此基于ConsumeQueue条目就能执行broker端的TAG过滤。如果没有过滤通过,则跳过该索引条目,拉取下一个索引条目。 7. TAG校验通过,**调用commitLog\#getMessage方法根据消息的物理偏移量和消息大小获取该索引对应的真正的消息内存selectResult**。如果没有找到消息,表示该偏移量可能到达了文件末尾,消息存放在下一个commitLog文件中。nextPhyFileStartOffset设置为下一个commitLog文件的起始物理偏移量,并跳过本次拉取。 8. 找到了消息,继续**通过messageFilter\#isMatchedByCommitLog方法执行消息SQL92 过滤**。SQL92 过滤依赖于消息中的属性,而消息体的内容存放在commitLog中的,因此需要先拉取到消息,在进行SQL92过滤。过滤不通过,释放这一段内存,并跳过本次拉取。 9. **TAG和SQL92校验通过,那么将消息通过getResult\#addMessage存入getResult,注意存入的是一段buffer内存,即字节数组**。 10. 更改status 为GetMessageStatus.FOUND,表示找到了消息。nextPhyFileStartOffset重新置为Long.MIN\_VALUE,继续下一次循环。
4、 循环拉取消息结束计算下一次读取数据的ConsumeQueue的开始偏移量nextBeginOffset=当前开始拉取消息时的偏移量+本次拉取移动的偏移量;5、 * *判断是否建议下一次从SLAVEbroker中拉取消息如果剩余的commitLog磁盘可拉取的消息字节数大于broker服务最大可使用物理内存,那么设置suggestPullingFromSlave=true,建议下一次从SLAVEbroker中拉取消息,因为此时发现消息堆积太多,默认超过物理内存的40% **;7、 进行一些storeStatsService中的状态字段的统计,用于控制台展示;8、 设置getResult的属性并返回包括设置拉取状态status、设置下次拉取的consumeQueue的起始逻辑偏移量nextBeginOffset,设置consumeQueue的最小、最大的逻辑偏移量maxOffset和minOffset;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 public GetMessageResult getMessage (final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { if (this .shutdown) { log.warn("message store has shutdown, so getMessage is forbidden" ); return null ; } if (!this .runningFlags.isReadable()) { log.warn("message store is not readable, so getMessage is forbidden " + this .runningFlags.getFlagBits()); return null ; } if (MixAll.isLmq(topic) && this .isLmqConsumeQueueNumExceeded()) { log.warn("message store is not available, broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num" ); return null ; } long beginTime = this .getSystemClock().now(); GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; long nextBeginOffset = offset; long minOffset = 0 ; long maxOffset = 0 ; GetMessageResult getResult = null ; final long maxOffsetPy = this .commitLog.getMaxOffset(); ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null ) { minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); if (maxOffset == 0 ) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0 ); } else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = nextOffsetCorrection(offset, offset); } else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else { nextBeginOffset = nextOffsetCorrection(offset, maxOffset); } } else { SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null ) { try { status = GetMessageStatus.NO_MATCHED_MESSAGE; long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0 ; int i = 0 ; final int maxFilterMessageCount = Math.max(16000 , maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); final boolean diskFallRecorded = this .messageStoreConfig.isDiskFallRecorded(); getResult = new GetMessageResult (maxMsgNums); ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt .CqExtUnit(); for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); maxPhyOffsetPulling = offsetPy; if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue ; } boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); if (this .isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { break ; } boolean extRet = false , isTagsCodeLegal = true ; if (consumeQueue.isExtAddr(tagsCode)) { extRet = consumeQueue.getExt(tagsCode, cqExtUnit); if (extRet) { tagsCode = cqExtUnit.getTagsCode(); } else { log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}" , tagsCode, offsetPy, sizePy, topic, group); isTagsCodeLegal = false ; } } if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null , extRet ? cqExtUnit : null )) { if (getResult.getBufferTotalSize() == 0 ) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue ; } SelectMappedBufferResult selectResult = this .commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0 ) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this .commitLog.rollNextFile(offsetPy); continue ; } if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null )) { if (getResult.getBufferTotalSize() == 0 ) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } selectResult.release(); continue ; } this .storeStatsService.getGetMessageTransferedMsgCount().add(1 ); getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } if (diskFallRecorded) { long fallBehind = maxOffsetPy - maxPhyOffsetPulling; brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); } nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long diff = maxOffsetPy - maxPhyOffsetPulling; long memory = (long ) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this .messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0 )); getResult.setSuggestPullingFromSlave(diff > memory); } finally { bufferConsumeQueue.release(); } } else { status = GetMessageStatus.OFFSET_FOUND_NULL; nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset)); log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " + maxOffset + ", but access logic queue failed." ); } } } else { status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0 ); } if (GetMessageStatus.FOUND == status) { this .storeStatsService.getGetMessageTimesTotalFound().add(1 ); } else { this .storeStatsService.getGetMessageTimesTotalMiss().add(1 ); } long elapsedTime = this .getSystemClock().now() - beginTime; this .storeStatsService.setGetMessageEntireTimeMax(elapsedTime); if (getResult == null ) { getResult = new GetMessageResult (0 ); } getResult.setStatus(status); getResult.setNextBeginOffset(nextBeginOffset); getResult.setMaxOffset(maxOffset); getResult.setMinOffset(minOffset); return getResult; }
2.1.1 nextOffsetCorrection矫正下一次拉取偏移量 如果broker不是SLAVE节点,或者是SLAVE节点但是从服务器支持offset检查,那没返回broker设置的offset,否则还是返回consumer传递的offset。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private long nextOffsetCorrection (long oldOffset, long newOffset) { long nextOffset = oldOffset; if (this .getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this .getMessageStoreConfig().isOffsetCheckInSlave()) { nextOffset = newOffset; } return nextOffset; }
2.1.2 getIndexBuffer获取索引数据buffer consumer传递的offset大于等于minOffset,且小于maxOffset,表示偏移量在正常范围内。
那么根据逻辑offset定位到物理偏移量,然后定位到所属的consumeQueue文件对应的MappedFile,然后从该MappedFile截取一段Buffer,其包含从要拉取的消息的索引数据开始其后的全部索引数据。
一条consumeQueue索引默认固定长度20B,这里截取的Buffer可能包含多条索引数据,但是一定包含将要拉取的下一条数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public SelectMappedBufferResult getIndexBuffer (final long startIndex) { int mappedFileSize = this .mappedFileSize; long offset = startIndex * CQ_STORE_UNIT_SIZE; if (offset >= this .getMinLogicOffset()) { MappedFile mappedFile = this .mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile != null ) { SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int ) (offset % mappedFileSize)); return result; } } return null ; }
2.1.3 checkInDiskByCommitOffset检查消息是否在磁盘上 获取broker最大可用内存,默认为机器物理最大可用内存 * 40/100 ,即broker最大可用内存为最大物理内存的百分之40。
如果commitLog中的最大物理偏移量 - 拉取的消息在commitLog中的物理偏移量的差值大于获取broker最大可用内存,那么任务数据已经在磁盘上了,否则认为还在内存中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private boolean checkInDiskByCommitOffset (long offsetPy, long maxOffsetPy) { long memory = (long ) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this .messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0 )); return (maxOffsetPy - offsetPy) > memory; }
注意到这里有个StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE常量,它实际上是保存着当前服务器上以字节为单位的物理内存总量。
**内部的ManagementFactory以及getTotalPhysicalMemorySize方法都是JDK提供的,可以了解下,算是涨涨见识,Java中是可以直接获取到服务器的物理内存总量的。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class StoreUtil { public static final long TOTAL_PHYSICAL_MEMORY_SIZE = getTotalPhysicalMemorySize(); @SuppressWarnings("restriction") public static long getTotalPhysicalMemorySize () { long physicalTotal = 1024 * 1024 * 1024 * 24L ; OperatingSystemMXBean osmxb = ManagementFactory.getOperatingSystemMXBean(); if (osmxb instanceof com.sun.management.OperatingSystemMXBean) { physicalTotal = ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize(); } return physicalTotal; } }
2.1.4 isTheBatchFull判断消息拉取上限 该方法判断消息拉取是否达到上限,如果达到上限,则跳出循环,结束消息的拉取。
1、 如果已拉取的消息总大小或者已拉取的消息总数量还是0,则返回false表示还没有拉取到消息;2、 如果要查询的最大消息数(默认32)小于等于已拉取的消息总数量,则返回true表示拉取数量达到了阈值;3、 如果当前消息在磁盘中:;
1、 如果已拉取消息字节数+待拉取的当前消息的字节大小大于maxTransferBytesOnMessageInDisk=64KB,则返回true表示从磁盘上拉取消息的大小超过了阈值64KB;2、 如果已拉取的消息总数量>maxTransferCountOnMessageInDisk-1=8-1,则返回true表示从磁盘上拉取消息的数量超过了阈值8条;4、 如果当前消息在内存中:;
1、 如果已拉取消息字节数+待拉取的当前消息的字节大小大于maxTransferBytesOnMessageInMemory=256KB,则返回true表示从内存中拉取消息的大小超过了阈值256KB;2、 如果已拉取的消息总数量>maxTransferCountOnMessageInMemory-1=32-1,则返回true表示从磁盘上拉取消息的数量超过了阈值32条;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 private boolean isTheBatchFull (int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) { if (0 == bufferTotal || 0 == messageTotal) { return false ; } if (maxMsgNums <= messageTotal) { return true ; } if (isInDisk) { if ((bufferTotal + sizePy) > this .messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) { return true ; } if (messageTotal > this .messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1 ) { return true ; } } else { if ((bufferTotal + sizePy) > this .messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) { return true ; } if (messageTotal > this .messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1 ) { return true ; } } return false ; }
2.1.5 isMatchedByConsumeQueue执行broker消息过滤 对于ConsumeQueue的条目,通过tagsCode或bitmap filter进行tagsCode匹配。
**对于普通消息来说,tagsCode就是的tags的hashCode,这里的tags是生产者发送消息时设置的(对于延迟消息就是消息投递时间,后面会讲到)。 **
**而在消费者启动的时候,订阅topic的时候需要传递subExpression参数,它仅支持或操作,如“tag1 | | tag2 | | tag3”表示多个tag,如果为 null 或 *,则表示订阅全部。subExpression在客户端会被解析,然后对于每一个tag,会计算他们的tagsCode,并存入subscriptionData的codeSet集合中。而subscriptionData会随着心跳信息上报给broker缓存起来。 **
**在消费者拉取消息的时候,就会判断该消息的tagsCode是否存在于subscriptionData的codeSet集合中,如果存在,则表示消费者订阅的tag和生产者生产该消息时设置的tag匹配,那么表示该消费者可以消费这个消息。 **
以上就是broker端的消息过滤,他们比较的是tagsCode的值,即hashcode值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Override public boolean isMatchedByConsumeQueue (Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { if (null == subscriptionData) { return true ; } if (subscriptionData.isClassFilterMode()) { return true ; } if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { if (tagsCode == null ) { return true ; } if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) { return true ; } return subscriptionData.getCodeSet().contains(tagsCode.intValue()); } else { } return true ; }
2.1.6 getMessage获取消息buffer 调用commitLog#getMessage方法根据消息的物理偏移量和消息大小获取该索引对应的真正的消息所属的一段内存。
首先调用findMapedFileByOffset根据offset找到其所属的CommitLog文件对应的MappedFile,该方法在broker消息刷盘的源码部分已经讲过了。然后该mappedFile的起始偏移量pos,从pos开始截取size大小的一段buffer内存返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public SelectMappedBufferResult getMessage (final long offset, final int size) { int mappedFileSize = this .defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); MappedFile mappedFile = this .mappedFileQueue.findMappedFileByOffset(offset, offset == 0 ); if (mappedFile != null ) { int pos = (int ) (offset % mappedFileSize); return mappedFile.selectMappedBuffer(pos, size); } return null ; }
2.1.7 addMessage添加消息 添加消息buffer到GetMessageResult中。
注意这里仅仅是添加字节buffer序列,并且直接返回给Consumer,没有反序列化为Message对象,Consumer收到消息之后,会按照CommitLog消息序列化格式自行反序列化为Message对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void addMessage (final SelectMappedBufferResult mapedBuffer) { this .messageMapedList.add(mapedBuffer); this .messageBufferList.add(mapedBuffer.getByteBuffer()); this .bufferTotalSize += mapedBuffer.getSize(); this .msgCount4Commercial += (int ) Math.ceil( mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT); }
2.2 suspendPullRequest挂起PullRequest **当没有读取到消息,如果broker允许挂起请求并且客户端支持请求挂起,则broker挂起该请求一段时间,中间如果有消息到达或者延迟间隔时间到了,则会再次尝试拉取消息。 **
**这里要求brokerAllowSuspend为true,新的拉取请求为true,但是已被suspend的请求将会是false,即尝试重试拉取的请求如果再拉取不到消息则不会再被挂起。 **
**broker最长的挂起时间,默认15s,该参数是消费者传递的,但是如果broker不支持长轮询(默认都是支持的),那么使用短轮询,即最长的挂起时间设置为1s。 **
*该方法suspendPullRequest,仅仅是将pullRequest及其对应关系存入到PullRequestHoldService的内部pullRequestTable集合中,并没有执行后续逻辑,但是并没有其他逻辑,那么这里的挂起以及后续操作如何实现的呢?实际上PullRequestHoldService是一个ServiceThread的子类,那么很明显它是一个单线程任务,而后续的延迟处理操作都是在该线程任务中实现。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void suspendPullRequest (final String topic, final int queueId, final PullRequest pullRequest) { String key = this .buildKey(topic, queueId); ManyPullRequest mpr = this .pullRequestTable.get(key); if (null == mpr) { mpr = new ManyPullRequest (); ManyPullRequest prev = this .pullRequestTable.putIfAbsent(key, mpr); if (prev != null ) { mpr = prev; } } mpr.addPullRequest(pullRequest); }
2.2.1 run处理挂起请求 在一个循环中执行阻塞以及检测操作。
在循环中首先阻塞线程,将会定时唤醒,或者broker有新消息到达唤醒,如果支持长轮询,那么最长等待5s,否则等待shortPollingTimeMills,默认1s。
线程醒来之后,继续执行checkHoldRequest方法,该方法就是核心方法,将检测pullRequestTable中的挂起的请求,如果有新消息到达则执行拉取操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @Override public void run () { log.info("{} service started" , this .getServiceName()); while (!this .isStopped()) { try { if (this .brokerController.getBrokerConfig().isLongPollingEnable()) { this .waitForRunning(5 * 1000 ); } else { this .waitForRunning(this .brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this .systemClock.now(); this .checkHoldRequest(); long costTime = this .systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000 ) { log.info("[NOTIFYME] check hold request cost {} ms." , costTime); } } catch (Throwable e) { log.warn(this .getServiceName() + " service has exception. " , e); } } log.info("{} service end" , this .getServiceName()); }
2.2.2 checkHoldRequest检测挂起请求 该方法将会遍历pullRequestTable中所有的挂起的请求,然后对所有的请求执行notifyMessageArriving尝试拉取消息的操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 protected void checkHoldRequest () { for (String key : this .pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0 ]; int queueId = Integer.parseInt(kArray[1 ]); final long offset = this .brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); try { this .notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error("check hold request failed. topic={}, queueId={}" , topic, queueId, e); } } } }
2.2.3 notifyMessageArriving通知消息到达 该方法用于尝试通知消息到达,但是不一定真的到达了,可能是因为阻塞到期被唤醒而调用。
1、 构建key:topic@queueId从缓存里面尝试获取该key的值ManyPullRequest,ManyPullRequest是包含多个pullRequest的对象,内部有一个相同请求的集合;2、 遍历集合中所有挂起的请求;
1、 如果最大偏移量小于等于需要拉取的offset,那么再次获取consumeQueue的最大的逻辑偏移量offset;2、 如果最大偏移量大于需要拉取的offset,那么可以尝试拉取;
1 2 3 1. 通过MessageFilter\#isMatchedByConsumeQueue方法,执行消息tagsCode过滤,如果是定时唤醒,由于tagsCode参数为null,那么一定返回true。 2. 如果消息匹配过滤条件。通过PullMessageProcessor\#executeRequestWhenWakeup重新执行拉取操作。随后结束本次循环。 3. 如果request等待超时,那么还是会通过PullMessageProcessor\#executeRequestWhenWakeup重新执行一次拉取操作。随后结束本次循环。
4、 不符合条件并且没有超时的request,重新放回replayList集合中,继续挂起;3、 将replayList集合中,继续挂起的request重新放入pullRequestTable;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 public void notifyMessageArriving (final String topic, final int queueId, final long maxOffset) { notifyMessageArriving(topic, queueId, maxOffset, null , 0 , null , null ); } public void notifyMessageArriving (final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte [] filterBitMap, Map<String, String> properties) { String key = this .buildKey(topic, queueId); ManyPullRequest mpr = this .pullRequestTable.get(key); if (mpr != null ) { List<PullRequest> requestList = mpr.cloneListAndClear(); if (requestList != null ) { List<PullRequest> replayList = new ArrayList <PullRequest>(); for (PullRequest request : requestList) { long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { newestOffset = this .brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt .CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); if (match && properties != null ) { match = request.getMessageFilter().isMatchedByCommitLog(null , properties); } if (match) { try { this .brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed." , e); } continue ; } } if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { this .brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed." , e); } continue ; } replayList.add(request); } if (!replayList.isEmpty()) { mpr.addPullRequest(replayList); } } } }
2.2.4 executeRequestWhenWakeup唤醒后执行拉取操作
*该方法创建于给线程任务,任务内容就是调用调用processRequest方法拉取消息,注意这里的brokerAllowSuspend参数为false,也就是说,本次请求如果没有拉取到消息,那么不会再挂起,随后将会直接将response写回给客户端。 **
这个线程任务将会被提交到pullMessageExecutor线程池异步的处理的执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public void executeRequestWhenWakeup (final Channel channel, final RemotingCommand request) throws RemotingCommandException { Runnable run = new Runnable () { @Override public void run () { try { final RemotingCommand response = PullMessageProcessor.this .processRequest(channel, request, false ); if (response != null ) { response.setOpaque(request.getOpaque()); response.markResponseType(); try { channel.writeAndFlush(response).addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { log.error("processRequestWrapper response to {} failed" , future.channel().remoteAddress(), future.cause()); log.error(request.toString()); log.error(response.toString()); } } }); } catch (Throwable e) { log.error("processRequestWrapper process request over, but response failed" , e); log.error(request.toString()); log.error(response.toString()); } } } catch (RemotingCommandException e1) { log.error("excuteRequestWhenWakeup run" , e1); } } }; this .brokerController.getPullMessageExecutor().submit(new RequestTask (run, channel, request)); }
12.2.5 ReputMessageService消息到达通知 **上面的notifyMessageArriving方法,除了因为PullRequestHoldService服务线程定时唤醒而被调用之外,reputMessageService服务发现新消息时可能也会调用该方法。 **
我们此前学习的ReputMessageService异步构建ConsumeQueue和IndexFile的部分,在DefaultMessageStore#doReput对于新的消息执行重放的方法中,有这样一段代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if (BrokerRole.SLAVE != DefaultMessageStore.this .getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this .brokerConfig.isLongPollingEnable() && DefaultMessageStore.this .messageArrivingListener != null ) { DefaultMessageStore.this .messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1 , dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); notifyMessageArrive4MultiQueue(dispatchRequest); }
**在构建ConsumeQueue和IndexFile之前执行这一段代码,如果broker角色不是SLAVE,并且支持长轮询,并且消息送达的监听器不为null,那么调用messageArrivingListener#arriving方法: **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class NotifyMessageArrivingListener implements MessageArrivingListener { private final PullRequestHoldService pullRequestHoldService; public NotifyMessageArrivingListener (final PullRequestHoldService pullRequestHoldService) { this .pullRequestHoldService = pullRequestHoldService; } @Override public void arriving (String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, byte [] filterBitMap, Map<String, String> properties) { this .pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties); } }
**arriving方法内部调用pullRequestHoldService的notifyMessageArriving方法,表示有新的消息到到达了该消息队列,其对应的PullRequest可以进行消息拉取了。 **
所以说notifyMessageArriving方法有这几种调用情况:
1、 PullRequestHoldService线程定时调用:;
1、 长轮询:最多挂起15s,每隔5s对所有PullRequest执行notifyMessageArriving方法;2、 短轮询:最多挂起1s,每隔1s对所有PullRequest执行notifyMessageArriving方法;2、 ReputMessageService线程调用:;
1 1. **当有新的消息到达时,在DefaultMessageStore\#doReput方法对于新的消息执行重放的过程中,会对等待对应topic@queueId的所有PullRequest执行notifyMessageArriving方法。doReput方法每1ms执行一次。**
2.3 commitOffset上报消费offset
*在执行processRequest方法的最后,只要broker支持挂起请求(新的拉取请求为true,但是已被suspend的请求将会是false,即要求是首次拉取),并且consumer支持提交消费进度(consumer如果是集群消费模式,那么就会支持提交消费进度),并且当前broker不是SLAVE角色,那么通过ConsumerOffsetManager#commitOffset方法提交消费进度偏移量。 **
*这里提交偏移量实际上就是将新的偏移量存入ConsumerOffsetManager的offsetTable中。该缓存对应着磁盘上的{user.home}/store/config/consumerOffset.json文件。这里实际上是存入到内存中的,并没有持久化。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public void commitOffset (final String clientHost, final String group, final String topic, final int queueId, final long offset) { String key = topic + TOPIC_GROUP_SEPARATOR + group; this .commitOffset(clientHost, key, queueId, offset); } private void commitOffset (final String clientHost, final String key, final int queueId, final long offset) { ConcurrentMap<Integer, Long> map = this .offsetTable.get(key); if (null == map) { map = new ConcurrentHashMap <Integer, Long>(32 ); map.put(queueId, offset); this .offsetTable.put(key, map); } else { Long storeOffset = map.put(queueId, offset); if (storeOffset != null && offset < storeOffset) { log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}" , clientHost, key, queueId, offset, storeOffset); } } }
broker启动过程中,在BrokerController#initialize方法中会启动一些定时调度任务,其中有一个任务每隔5s将消费者offset进行持久化(offsetTable中的数据),存入consumerOffset.json文件中。
3 总结 本次我们讲解了Broker处理拉取消息请求的源码,总体包括构建过滤信息,拉取消息,拉取结果处理(判断直接返回响应还是挂起请求),上报消费点位等步骤。
通过DefaultMessageStore#getMessage方法拉取消息,并且进行过滤操作。返回拉取结果GetMessageResult。这是拉取消息的核心方法,涉及到查找ConsumeQueue和CommitLog文件数据,默认最多拉取32条消息。
1、 对于拉取到的消息,将会进行broker端的消息过滤,通过tagsCode进行匹配对于普通消息来说,tagsCode就是的tags的hashCode而在消费者启动的时候,订阅topic的时候需要传递subExpression参数,这里的匹配就是subExpression的hashCode与tags的hashCode值进行比较,如果一致,则算作Broker端匹配(后续在Consumer端还会再进行比较一次);2、 获取到的消息实际上时一段字节buffer序列,并且直接返回给Consumer,Consumer收到消息之后,会按照CommitLog消息序列化格式自行反序列化为Message对象;
没有拉取到消息,并且如果broker允许挂起请求并且客户端支持请求挂起,则通过pullRequestHoldService#suspendPullRequest方法提交PullRequest,broker挂起该请求一段时间,中间如果有消息到达或者延迟间隔时间到了,则会再次尝试调用notifyMessageArriving方法拉取消息。所notifyMessageArriving方法有这几种调用情况。
1、 PullRequestHoldService线程定时调用:;
1 2 1. 长轮询:最多挂起15s,每隔5s对所有PullRequest执行notifyMessageArriving方法。 2. 短轮询:最多挂起1s,每隔1s对所有PullRequest执行notifyMessageArriving方法。
2、 ReputMessageService线程调用:;
1 1. 当有新的消息到达时,在DefaultMessageStore\#doReput方法对于新的消息执行重放,构建C
__END__