17、RocketMQ源码分析:RebalanceService消费者负载均衡过程源码 文章目录
1 doRebalance执行重平衡
2 RebalanceImpl#doRebalance执行重平衡
3 rebalanceByTopic根据topic执行重平衡
4 findConsumerIdList查找客户端id集合
4.1 findBrokerAddrByTopic随机查找broker
4.2 getConsumerIdListByGroup获取Group所有ConsumerId集合
4.2.1 broker处理getConsumerListByGroup请求
4.2.2 ConsumerManageProcessor#getConsumerListByGroup
5 allocate分配消息队列
5.1 AllocateMessageQueueAveragely平均分配
5.2 AllocateMessageQueueAveragelyByCircle环形平均分配
5.3AllocateMessageQueueByConfig根据配置分配
5.4 AllocateMessageQueueByMachineRoom机房平均分配
5.5 AllocateMachineRoomNearby机房就近分配
5.6 AllocateMessageQueueConsistentHash一致性哈希分配
6 updateProcessQueueTableInRebalance更新处理队列
6.1 removeUnnecessaryMessageQueue移除非必要的消息队列
6.2 lock获取分布式锁
6.3 computePullFromWhereWithException计算offset
6.3.1 readOffset获取offset
6.3.1.1 fetchConsumeOffsetFromBroker从broker获取offset
6.3.1.2 queryConsumerOffset查询消费偏移量
6.4 dispatchPullRequest分发拉取消息请求PullRequest
7 messageQueueChanged更新消息队列
8 总结
1 doRebalance执行重平衡
*负载均衡or重平衡的触发操作,最终都会执行MQClientInstance的doRebalance方法。该方法将会遍历consumerTable,获取每一个消费者MQConsumerInner,即DefaultMQPushConsumerImpl或者其他实例,然后通过消费者本身来执行重平衡操作。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void doRebalance () { for (Map.Entry<String, MQConsumerInner> entry : this .consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null ) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception" , e); } } } }
MQConsumerInner有三种实现,分别是DefaultLitePullConsumerImpl、DefaultMQPullConsumerImpl、DefaultMQPushConsumerImpl,前两个都用的很少,他们的doRebalance源码也都很简单,即调用各自内部的rebalanceImpl#doRebalance( false)方法即可。 *
1 2 3 4 5 6 7 8 9 10 @Override public void doRebalance () { if (this .rebalanceImpl != null ) { this .rebalanceImpl.doRebalance(false ); } }
**我们最常使用的是DefaultMQPushConsumerImpl,它的doRebalance方法也很简单,如果该消费者没有暂停,那么同样调用rebalanceImpl#doRebalance方法即可。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void doRebalance () { if (!this .pause) { this .rebalanceImpl.doRebalance(this .isConsumeOrderly()); } }
2 RebalanceImpl#doRebalance执行重平衡 该方法将会获取当前消费者的订阅信息集合,然后遍历订阅信息集合,获取订阅的topic,调用rebalanceByTopic方法对该topic进行重平衡。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public void doRebalance (final boolean isOrder) { Map<String, SubscriptionData> subTable = this .getSubscriptionInner(); if (subTable != null ) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this .rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception" , e); } } } } this .truncateMessageQueueNotMyTopic(); }
3 rebalanceByTopic根据topic执行重平衡 该方法根据topic进行重平衡,将会根据不同的消息模式执行不同的处理策略。
1、 **如果是广播模式,广播模式下并没有负载均衡可言,每个consumer都会消费所有队列中的全部消息,仅仅是更新当前consumer的处理队列processQueueTable的信息 **;2、 **如果是集群模式,首先基于负载均衡策略确定分配给当前消费者的MessageQueue,然后更新当前consumer的处理队列processQueueTable的信息 **;
集群模式的大概步骤为:
1、 首先获取该topic的所有消息队列集合mqSet,随后从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合cidAll一个clientId代表一个消费者;2、 对topic的消息队列和clientId集合分别进行排序排序能够保证,不同的客户端消费者在进行负载均衡时,其mqAll和cidAll中的元素顺序是一致的;3、 获取分配消息队列的策略实现AllocateMessageQueueStrategy,即负载均衡的策略类,执行allocate方法,为当前clientId也就是当前消费者,分配消息队列,这一步就是执行负载均衡或者说重平衡的算法;4、 调用updateProcessQueueTableInRebalance方法,更新新分配的消息队列的处理队列processQueueTable的信息,为新分配的消息队列创建最初的pullRequest并分发给PullMessageService;5、 如果processQueueTable发生了改变,那么调用messageQueueChanged方法设置新的本地订阅关系版本,重设流控参数,立即给所有broker发送心跳,让Broker更新当前订阅关系;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 private void rebalanceByTopic (final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this .topicSubscribeInfoTable.get(topic); if (mqSet != null ) { boolean changed = this .updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this .messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}" , consumerGroup, topic, mqSet, mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist." , consumerGroup, topic); } break ; } case CLUSTERING: { Set<MessageQueue> mqSet = this .topicSubscribeInfoTable.get(topic); List<String> cidAll = this .mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist." , consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed" , consumerGroup, topic); } if (mqSet != null && cidAll != null ) { List<MessageQueue> mqAll = new ArrayList <MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this .allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null ; try { allocateResult = strategy.allocate( this .consumerGroup, this .mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}" , strategy.getName(), e); return ; } Set<MessageQueue> allocateResultSet = new HashSet <MessageQueue>(); if (allocateResult != null ) { allocateResultSet.addAll(allocateResult); } boolean changed = this .updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}" , strategy.getName(), consumerGroup, topic, this .mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this .messageQueueChanged(topic, mqSet, allocateResultSet); } } break ; } default : break ; } }
4 findConsumerIdList查找客户端id集合 **该方法从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合,用于后续负载均衡策略。一个cliendId代表着一个消费者。 **
首先通过findBrokerAddrByTopic方法随机选择一个当前topic所属的broker,如果broker地址为null则请求nameserver更新topic路由信息。然后调用getConsumerIdListByGroup方法根据brokerAddr和group 发起请求到broekr,得到消费者客户端id列表。 *
从这里的源码能够看出来,RocketMQ一个消费者组内的消费者订阅的topic都必须一致,否则就会出现订阅的topic被覆盖的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public List<String> findConsumerIdList (final String topic, final String group) { String brokerAddr = this .findBrokerAddrByTopic(topic); if (null == brokerAddr) { this .updateTopicRouteInfoFromNameServer(topic); brokerAddr = this .findBrokerAddrByTopic(topic); } if (null != brokerAddr) { try { return this .mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, clientConfig.getMqClientApiTimeout()); } catch (Exception e) { log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e); } } return null ; }
4.1 findBrokerAddrByTopic随机查找broker
*从topicRouteTable中获取topic路由信息,然后随机选择一个broker返回。为社么随机返回就可以呢?因为consumer会向所有broker上报心跳信息,因此这些broker中的客户端id是一致的。并且,RocketMQ默认一个消费者组的所有消费的订阅信息都是一致的,因此随便哪个broker上关于此Group所有ConsumerId集合都是一样的。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public String findBrokerAddrByTopic (final String topic) { TopicRouteData topicRouteData = this .topicRouteTable.get(topic); if (topicRouteData != null ) { List<BrokerData> brokers = topicRouteData.getBrokerDatas(); if (!brokers.isEmpty()) { int index = random.nextInt(brokers.size()); BrokerData bd = brokers.get(index % brokers.size()); return bd.selectBrokerAddr(); } } return null ; }
4.2 getConsumerIdListByGroup获取Group所有ConsumerId集合 该方法向指定地址的broker发起网络请求,查找指定group的全部消费者客户端id列表并返回。请求Code为**GET_CONSUMER_LIST_BY_GROUP **。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public List<String> getConsumerIdListByGroup ( final String addr, final String consumerGroup, final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader (); requestHeader.setConsumerGroup(consumerGroup); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader); RemotingCommand response = this .remotingClient.invokeSync(MixAll.brokerVIPChannel(this .clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null ; switch (response.getCode()) { case ResponseCode.SUCCESS: { if (response.getBody() != null ) { GetConsumerListByGroupResponseBody body = GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class); return body.getConsumerIdList(); } } default : break ; } throw new MQBrokerException (response.getCode(), response.getRemark(), addr); }
4.2.1 broker处理getConsumerListByGroup请求 broker通过ConsumerManageProcessor对于处理GET_CONSUMER_LIST_BY_GROUP的请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public RemotingCommand processRequest (ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.GET_CONSUMER_LIST_BY_GROUP: return this .getConsumerListByGroup(ctx, request); case RequestCode.UPDATE_CONSUMER_OFFSET: return this .updateConsumerOffset(ctx, request); case RequestCode.QUERY_CONSUMER_OFFSET: return this .queryConsumerOffset(ctx, request); default : break ; } return null ; }
4.2.2 ConsumerManageProcessor#getConsumerListByGroup 返回指定group的所有客户端id集合。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public RemotingCommand getConsumerListByGroup (ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class); final GetConsumerListByGroupRequestHeader requestHeader = (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); ConsumerGroupInfo consumerGroupInfo = this .brokerController.getConsumerManager().getConsumerGroupInfo( requestHeader.getConsumerGroup()); if (consumerGroupInfo != null ) { List<String> clientIds = consumerGroupInfo.getAllClientId(); if (!clientIds.isEmpty()) { GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody (); body.setConsumerIdList(clientIds); response.setBody(body.encode()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null ); return response; } else { log.warn("getAllClientId failed, {} {}" , requestHeader.getConsumerGroup(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); } } else { log.warn("getConsumerGroupInfo failed, {} {}" , requestHeader.getConsumerGroup(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); } response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup()); return response; }
5 allocate分配消息队列 **AllocateMessageQueueStrategy#allocate方法为当前clientId也就是当前消费者,分配消息队列,这一步实际上就是执行负载均衡或者说重平衡的算法。 **
*AllocateMessageQueueStrategy是RocketMQ消费者之间消息分配的策略算法接口,RocketMQ已经提供了非常多的算法策略实现类,同时我们自己也可以通过实现AllocateMessageQueueStrategy接口定义自己的负载均衡策略。 **
注意,在执行负载均衡策略之前,已经对消息队列和消费者进行了排序,因此不同的消费者客户端得到的顺序应该是一致的。
RocketMQ内置了六个负载均衡策略的实现类,我们来看看这些实现类的原理。
1、 AllocateMessageQueueAveragely :平均分配策略,这是默认策略尽量将消息队列平均分配给所有消费者,多余的队列分配至排在前面的消费者分配的时候,前一个消费者分配完了,才会给下一个消费者分配;2、 AllocateMessageQueueAveragelyByCircle :环形平均分配策略尽量将消息队列平均分配给所有消费者,多余的队列分配至排在前面的消费者与平均分配策略差不多,区别就是分配的时候,按照消费者的顺序进行一轮一轮的分配,直到分配完所有消息队列;3、 AllocateMessageQueueByConfig :根据用户配置的消息队列分配将会直接返回用户配置的消息队列集合;4、 AllocateMessageQueueByMachineRoom :机房平均分配策略消费者只消费绑定的机房中的broker,并对绑定机房中的MessageQueue进行负载均衡;5、 AllocateMachineRoomNearby :机房就近分配策略消费者对绑定机房中的MessageQueue进行负载均衡除此之外,对于某些拥有消息队列但却没有消费者的机房,其消息队列会被所欲消费者分配,具体的分配策略是,另外传入的一个AllocateMessageQueueStrategy的实现;6、 AllocateMessageQueueConsistentHash :一致性哈希分配策略基于一致性哈希算法分配;
5.1 AllocateMessageQueueAveragely平均分配 计算消息队列数量与消费者数量的商,这个商就是每个消费者都会分到的队列数,然后对于余数,则只有排在前面的消费者能够分配到。
**在进行分配的时候,只有当前一个消费者分配完了,才会分配下一个消费者。例如有消费者A、B,有5个消息队列1、2、3、4、5,计算得到A会分配3分队列,B会分配2个队列,那么将会先给消费者A分配1、2、3,再给消费者B分配到4、5。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { private final InternalLogger log = ClientLogger.getLog(); @Override public List<MessageQueue> allocate (String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1 ) { throw new IllegalArgumentException ("currentCID is empty" ); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException ("mqAll is null or mqAll empty" ); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException ("cidAll is null or cidAll empty" ); } List<MessageQueue> result = new ArrayList <MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}" , consumerGroup, currentCID, cidAll); return result; } int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0 ; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; } @Override public String getName () { return "AVG" ; } }
5.2 AllocateMessageQueueAveragelyByCircle环形平均分配 **按照消费者的顺序进行一轮一轮的分配,直到分配完所有消息队列。例如有消费者A、B,有5个消息队列1、2、3、4、5。第一轮A分配1,B分配2;第二轮A分配3,B分配4;第二轮A分配5。因此A分配到1、3、5,B分配到2、4。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy { private final InternalLogger log = ClientLogger.getLog(); @Override public List<MessageQueue> allocate (String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1 ) { throw new IllegalArgumentException ("currentCID is empty" ); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException ("mqAll is null or mqAll empty" ); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException ("cidAll is null or cidAll empty" ); } List<MessageQueue> result = new ArrayList <MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}" , consumerGroup, currentCID, cidAll); return result; } int index = cidAll.indexOf(currentCID); for (int i = index; i < mqAll.size(); i++) { if (i % cidAll.size() == index) { result.add(mqAll.get(i)); } } return result; } @Override public String getName () { return "AVG_BY_CIRCLE" ; } }
5.3AllocateMessageQueueByConfig根据配置分配 **很简单,如果要想使用该策略,那么应该调用setMessageQueueList方法传入自定义的需要消费的消息队列集合,而allocate方法将直接返回该集合。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy { private List<MessageQueue> messageQueueList; @Override public List<MessageQueue> allocate (String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { return this .messageQueueList; } @Override public String getName () { return "CONFIG" ; } public List<MessageQueue> getMessageQueueList () { return messageQueueList; } public void setMessageQueueList (List<MessageQueue> messageQueueList) { this .messageQueueList = messageQueueList; } }
5.4 AllocateMessageQueueByMachineRoom机房平均分配 **消费者只消费绑定的机房中的broker,并对绑定机房中的MessageQueue进行负载均衡。这种策略要求brokerName的命名必须要按“机房名@brokerName”的格式来设置。 **
消费者在分配队列的时候,首先会按照机房名称过滤出所有的 MessageQueue,然后再按照平均分配策略进行分配。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy { private Set<String> consumeridcs; @Override public List<MessageQueue> allocate (String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (StringUtils.isBlank(currentCID)) { throw new IllegalArgumentException ("currentCID is empty" ); } if (CollectionUtils.isEmpty(mqAll)) { throw new IllegalArgumentException ("mqAll is null or mqAll empty" ); } if (CollectionUtils.isEmpty(cidAll)) { throw new IllegalArgumentException ("cidAll is null or cidAll empty" ); } List<MessageQueue> result = new ArrayList <MessageQueue>(); int currentIndex = cidAll.indexOf(currentCID); if (currentIndex < 0 ) { return result; } List<MessageQueue> premqAll = new ArrayList <MessageQueue>(); for (MessageQueue mq : mqAll) { String[] temp = mq.getBrokerName().split("@" ); if (temp.length == 2 && consumeridcs.contains(temp[0 ])) { premqAll.add(mq); } } int mod = premqAll.size() / cidAll.size(); int rem = premqAll.size() % cidAll.size(); int startIndex = mod * currentIndex; int endIndex = startIndex + mod; for (int i = startIndex; i < endIndex; i++) { result.add(premqAll.get(i)); } if (rem > currentIndex) { result.add(premqAll.get(currentIndex + mod * cidAll.size())); } return result; } @Override public String getName () { return "MACHINE_ROOM" ; } public Set<String> getConsumeridcs () { return consumeridcs; } public void setConsumeridcs (Set<String> consumeridcs) { this .consumeridcs = consumeridcs; } }
5.5 AllocateMachineRoomNearby机房就近分配 使用该策略需要传递两个参数:
1、 allocateMessageQueueStrategy:用于真正分配消息队列的策略对象;2、 machineRoomResolver:机房解析器,从clientID和brokerName中解析出机房名称;
该策略的大概逻辑为:
1、 将消息队列根据机房分组,将消费者根据机房分组;2、 分配部署在与当前消费者相同的机房中的mq,即如果消息队列与消费者属于同一机房,则对他们进行分配具体的分配策略通过传入的allocateMessageQueueStrategy实现;3、 如果某个拥有消息队列的机房没有对应的消费者,那么它的消息队列由当前所有的消费者分配具体的分配策略通过传入的allocateMessageQueueStrategy实现;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy { private final InternalLogger log = ClientLogger.getLog(); private final AllocateMessageQueueStrategy allocateMessageQueueStrategy; private final MachineRoomResolver machineRoomResolver; public AllocateMachineRoomNearby (AllocateMessageQueueStrategy allocateMessageQueueStrategy, MachineRoomResolver machineRoomResolver) throws NullPointerException { if (allocateMessageQueueStrategy == null ) { throw new NullPointerException ("allocateMessageQueueStrategy is null" ); } if (machineRoomResolver == null ) { throw new NullPointerException ("machineRoomResolver is null" ); } this .allocateMessageQueueStrategy = allocateMessageQueueStrategy; this .machineRoomResolver = machineRoomResolver; } @Override public List<MessageQueue> allocate (String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1 ) { throw new IllegalArgumentException ("currentCID is empty" ); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException ("mqAll is null or mqAll empty" ); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException ("cidAll is null or cidAll empty" ); } List<MessageQueue> result = new ArrayList <MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}" , consumerGroup, currentCID, cidAll); return result; } Map<String, List<MessageQueue>> mr2Mq = new TreeMap <String, List<MessageQueue>>(); for (MessageQueue mq : mqAll) { String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq); if (StringUtils.isNoneEmpty(brokerMachineRoom)) { if (mr2Mq.get(brokerMachineRoom) == null ) { mr2Mq.put(brokerMachineRoom, new ArrayList <MessageQueue>()); } mr2Mq.get(brokerMachineRoom).add(mq); } else { throw new IllegalArgumentException ("Machine room is null for mq " + mq); } } Map<String, List<String>> mr2c = new TreeMap <String, List<String>>(); for (String cid : cidAll) { String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid); if (StringUtils.isNoneEmpty(consumerMachineRoom)) { if (mr2c.get(consumerMachineRoom) == null ) { mr2c.put(consumerMachineRoom, new ArrayList <String>()); } mr2c.get(consumerMachineRoom).add(cid); } else { throw new IllegalArgumentException ("Machine room is null for consumer id " + cid); } } List<MessageQueue> allocateResults = new ArrayList <MessageQueue>(); String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID); List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom); List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom); if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) { allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom)); } for (Entry<String, List<MessageQueue>> machineRoomEntry : mr2Mq.entrySet()) { if (!mr2c.containsKey(machineRoomEntry.getKey())) { allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, machineRoomEntry.getValue(), cidAll)); } } return allocateResults; } @Override public String getName () { return "MACHINE_ROOM_NEARBY" + "-" + allocateMessageQueueStrategy.getName(); } public interface MachineRoomResolver { String brokerDeployIn (MessageQueue messageQueue) ; String consumerDeployIn (String clientID) ; } }
5.6 AllocateMessageQueueConsistentHash一致性哈希分配 使用该策略可以传递两个参数:
1、 virtualNodeCnt :物理节点的虚拟节点的数量,不可小于0,默认10;2、 customHashFunction :自定义的哈希函数,默认为MD5Hash;
大概步骤为:
1、 实例化ConsistentHashRouter对象,用于产生虚拟节点以及构建哈希环,如果没有指定哈希函数,则采用MD5Hash作为哈希函数 ;2、 遍历消息队列集合,对messageQueue进行hash计算,按顺时针找到最近的consumer节点如果是当前consumer,则加入结果集 ;
**总体上还是比较简单的,但是ConsistentHashRouter的源码还是值得一看的,因为其基于Java实现了一个一致性哈希算法。例如,这里的“哈西环”,实际上是采用TreeMap来实现的。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy { private final InternalLogger log = ClientLogger.getLog(); private final int virtualNodeCnt; private final HashFunction customHashFunction; public AllocateMessageQueueConsistentHash () { this (10 ); } public AllocateMessageQueueConsistentHash (int virtualNodeCnt) { this (virtualNodeCnt, null ); } public AllocateMessageQueueConsistentHash (int virtualNodeCnt, HashFunction customHashFunction) { if (virtualNodeCnt < 0 ) { throw new IllegalArgumentException ("illegal virtualNodeCnt :" + virtualNodeCnt); } this .virtualNodeCnt = virtualNodeCnt; this .customHashFunction = customHashFunction; } @Override public List<MessageQueue> allocate (String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1 ) { throw new IllegalArgumentException ("currentCID is empty" ); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException ("mqAll is null or mqAll empty" ); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException ("cidAll is null or cidAll empty" ); } List<MessageQueue> result = new ArrayList <MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}" , consumerGroup, currentCID, cidAll); return result; } Collection<ClientNode> cidNodes = new ArrayList <ClientNode>(); for (String cid : cidAll) { cidNodes.add(new ClientNode (cid)); } final ConsistentHashRouter<ClientNode> router; if (customHashFunction != null ) { router = new ConsistentHashRouter <ClientNode>(cidNodes, virtualNodeCnt, customHashFunction); } else { router = new ConsistentHashRouter <ClientNode>(cidNodes, virtualNodeCnt); } List<MessageQueue> results = new ArrayList <MessageQueue>(); for (MessageQueue mq : mqAll) { ClientNode clientNode = router.routeNode(mq.toString()); if (clientNode != null && currentCID.equals(clientNode.getKey())) { results.add(mq); } } return results; } @Override public String getName () { return "CONSISTENT_HASH" ; } private static class ClientNode implements Node { private final String clientID; public ClientNode (String clientID) { this .clientID = clientID; } @Override public String getKey () { return clientID; } } }
6 updateProcessQueueTableInRebalance更新处理队列
*在通过AllocateMessageQueueStrategy#allocate的负载均衡算法为当前消费者分配了新的消息队列之后,需要调用updateProcessQueueTableInRebalance方法,更新新分配的消息队列的处理队列processQueueTable的信息,创建最初的pullRequest并分发给PullMessageService。 **
该方法非常的重要,大概步骤为:
1、 遍历当前消费者已分配的所有处理队列processQueueTable,当消费者启动并且第一次执行该方法时,processQueueTable是一个空集合如果当前遍历到的消息队列和当前topic相等:;
1、 如果新分配的消息队列集合不包含当前遍历到的消息队列,说明这个队列被移除了 ;
1 2 1. 设置对应的处理队列dropped = true,该队列中的消息将不会被消费。 2. 调用removeUnnecessaryMessageQueue删除不必要的消息队列。删除成功后,processQueueTable移除该条目,changed置为true。
2、 **如果当前遍历到的处理队列最后一次拉取消息的时间距离现在超过120s,那么算作消费超时,可能是没有新消息或者网络通信失败 **;
1 1. 如果是push消费模式,设置对应的处理队列dropped = true,该队列中的消息将不会被消费。调用**removeUnnecessaryMessageQueue**删除不必要的消息队列。删除成功后,processQueueTable移除该条目,changed置为true。
2、 创建一个pullRequestList集合,用于存放新增的PullRequest* *遍历新分配的消息队列集合,如果当前消费者的处理队列集合processQueueTable中不包含该消息队列,那么表示这个消息队列是新分配的,需要进行一系列处理: **;
1、 如果是顺序消费,并且调用lock方法请求broker锁定该队列失败,即获取该队列的分布式锁失败表示新增消息队列失败,这个队列可能还再被其他消费者消费,那么本次重平衡就不再消费该队列,进入下次循环;2、 如果不是顺序消费或者顺序消费加锁成功,调用removeDirtyOffset方法从offsetTable中移除该消息队列的消费点位offset记录信息;3、 为该消息队列创建一个处理队列ProcessQueue ;4、 调用computePullFromWhereWithException 方法,获取该MessageQueue的下一个消息的消费偏移量nextOffset,pull模式返回0,push模式则根据consumeFromWhere计算得到;5、 如果nextOffset大于0,表示获取消费位点成功保存当前消息队列MessageQueue和处理队列ProcessQueue关系到processQueueTable;6、 新建一个PullRequest ,设置对应的offset、consumerGroup、mq、pq的信息,并且存入pullRequestList集合中* *这里就是最初产生拉取消息请求的地方**changed置为true;3、 调用dispatchPullRequest方法,分发本次创建的PullRequest请求;
1、 pull模式需要手动拉取消息,这些请求会作废,因此该方法是一个空实现 ;2、 * *push模式下自动拉取消息,而这里的PullRequest就是对应的消息队列的第一个拉取请求,因此这些请求会被PullMessageService依次处理,后续实现自动拉取消息这里就是push模式下最初的产生拉取消息请求的地方 **;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 private boolean updateProcessQueueTableInRebalance (final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { boolean changed = false ; Iterator<Entry<MessageQueue, ProcessQueue>> it = this .processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); if (mq.getTopic().equals(topic)) { if (!mqSet.contains(mq)) { pq.setDropped(true ); if (this .removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true ; log.info("doRebalance, {}, remove unnecessary mq, {}" , consumerGroup, mq); } } else if (pq.isPullExpired()) { switch (this .consumeType()) { case CONSUME_ACTIVELY: break ; case CONSUME_PASSIVELY: pq.setDropped(true ); if (this .removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true ; log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it" , consumerGroup, mq); } break ; default : break ; } } } } List<PullRequest> pullRequestList = new ArrayList <PullRequest>(); for (MessageQueue mq : mqSet) { if (!this .processQueueTable.containsKey(mq)) { if (isOrder && !this .lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed" , consumerGroup, mq); continue ; } this .removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue (); long nextOffset = -1L ; try { nextOffset = this .computePullFromWhereWithException(mq); } catch (Exception e) { log.info("doRebalance, {}, compute offset failed, {}" , consumerGroup, mq); continue ; } if (nextOffset >= 0 ) { ProcessQueue pre = this .processQueueTable.putIfAbsent(mq, pq); if (pre != null ) { log.info("doRebalance, {}, mq already exists, {}" , consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}" , consumerGroup, mq); PullRequest pullRequest = new PullRequest (); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true ; } } else { log.warn("doRebalance, {}, add new mq failed, {}" , consumerGroup, mq); } } } this .dispatchPullRequest(pullRequestList); return changed; }
6.1 removeUnnecessaryMessageQueue移除非必要的消息队列 该方法用于尝试移除不必要的消息队列,可能会移除失败。大概步骤为:
调用OffsetStore#persist方法,保存指定消息队列的偏移量,可能在本地存储或远程服务器,集群模式保存在远程broker服务器上。
调用OffsetStore#removeOffset方法,移除OffsetStore内部的offsetTable中的对应消息队列的k-v数据。3、 Push模式下,如果当前消费者是有序消费,且是集群消费,那么尝试从Broker端将该消息队列的分布式锁解锁如果是并发消费或者是广播消费,则不进入试解锁的逻辑:;
通过consumeLock#tryLock方法尝试获取处理队列的消费锁,最多等待1s。这是一个本地互斥锁,保证在获取到锁以及发起解锁的过程中,没有线程能消费该队列的消息,因为MessageListenerOrderly在消费消息时也需要获取该锁。 2、 获得锁之后,调用unlockDelay方法延迟的向Broker发送单向请求,Code为UNLOCK_BATCH_MQ,请求Broker释放当前消息队列的分布式锁, 最多延迟20s 该方法一定会返回true; 3、 在finally中,处理队列的本地消费锁解锁; 4、 **如果没有获得本地锁,那么表示当前消息队列正在消息,不能解锁,那么本次就放弃解锁了,移除消息队列失败,等待下次重新分配消费队列时,再进行移除返回false **;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 @Override public boolean removeUnnecessaryMessageQueue (MessageQueue mq, ProcessQueue pq) { this .defaultMQPushConsumerImpl.getOffsetStore().persist(mq); this .defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); if (this .defaultMQPushConsumerImpl.isConsumeOrderly() && MessageModel.CLUSTERING.equals(this .defaultMQPushConsumerImpl.messageModel())) { try { if (pq.getConsumeLock().tryLock(1000 , TimeUnit.MILLISECONDS)) { try { return this .unlockDelay(mq, pq); } finally { pq.getConsumeLock().unlock(); } } else { log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}" , mq, pq.getTryUnlockTimes()); pq.incTryUnlockTimes(); } } catch (Exception e) { log.error("removeUnnecessaryMessageQueue Exception" , e); } return false ; } return true ; }
6.1.1 unlockDelay延迟解锁 **向Broker发送单向请求,Code为UNLOCK_BATCH_MQ,表示请求Broker释放当前消息队列的分布式锁。如果消费队列中还有剩余消息,则延迟20s发送解锁请求。 **
该方法似乎只会返回true,即只管发送不管结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private boolean unlockDelay (final MessageQueue mq, final ProcessQueue pq) { if (pq.hasTempMessage()) { log.info("[{}]unlockDelay, begin {} " , mq.hashCode(), mq); this .defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable () { @Override public void run () { log.info("[{}]unlockDelay, execute at once {}" , mq.hashCode(), mq); RebalancePushImpl.this .unlock(mq, true ); } }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS); } else { this .unlock(mq, true ); } return true ; }
6.2 lock获取分布式锁 **如果判断到某个消息队列是新分配给当前消费者的,并且如果是顺序消费,那么在当前消费者消费该消息队列之前,需要通过lock方法请求broker获取该队列的分布式锁。如果不是顺序消费,则此时不需要获取分布式锁。 **
**如果获取分布式锁失败,那么不会为当前消息队列创建ProcessQueue和PullRequest,因为此时表示该消息队列还不属于当前消费者,不能进行消费,这是RocketMQ保证顺序消费防止重复消费的一个措施。 **
1、 该方法首先调用findBrokerAddressInSubscribe获取指定brokerName的master地址;2、 然后将当前消费者组、当前客户端id、当前需要被锁定的消息队列等信息封装为一个LockBatchRequestBody,最后向broker发送同步请求,Code为LOCK_BATCH_MQ;3、 Broker返回一个set的MessageQueue集合,表示已经锁住的mq集合,然后编辑集合设置mq对应的processQueue属性,设置locked属性为true,设置加锁的时间属性为当前时间戳;4、 最后判断如果当前mq在集合中,那么返回true,表示当前mq锁定成功,否则返回false,表示锁定失败;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public boolean lock (final MessageQueue mq) { FindBrokerResult findBrokerResult = this .mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true ); if (findBrokerResult != null ) { LockBatchRequestBody requestBody = new LockBatchRequestBody (); requestBody.setConsumerGroup(this .consumerGroup); requestBody.setClientId(this .mQClientFactory.getClientId()); requestBody.getMqSet().add(mq); try { Set<MessageQueue> lockedMq = this .mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000 ); for (MessageQueue mmqq : lockedMq) { ProcessQueue processQueue = this .processQueueTable.get(mmqq); if (processQueue != null ) { processQueue.setLocked(true ); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } boolean lockOK = lockedMq.contains(mq); log.info("the message queue lock {}, {} {}" , lockOK ? "OK" : "Failed" , this .consumerGroup, mq); return lockOK; } catch (Exception e) { log.error("lockBatchMQ exception, " + mq, e); } } return false ; }
6.3 computePullFromWhereWithException计算offset **对于新分配的mq,需要知道从哪个点位开始消费,computePullFromWhereWithException方法就是用来获取该MessageQueue的下一个消息的消费偏移量offset的。 **
对于该方法,pull模式固定返回0,因为消费点位需要自己管理,而push模式则根据配置的consumeFromWhere计算得到:
1、 CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST、CONSUME_FROM_MIN_OFFSET、CONSUME_FROM_MAX_OFFSET这三种模式已经废弃,默认使用CONSUME_FROM_LAST_OFFSET的逻辑;2、 CONSUME_FROM_LAST_OFFSET:消费者组第一次启动时从最后的位置消费,后续再启动接着上次消费的进度开始消费;3、 CONSUME_FROM_FIRST_OFFSET:消费者组第一次启动时从最开始的位置消费,后续再启动接着上次消费的进度开始消费;4、 CONSUME_FROM_TIMESTAMP:消费者组第一次启动时消费在指定时间戳后产生的消息,后续再启动接着上次消费的进度开始消费;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 @Override public long computePullFromWhereWithException (MessageQueue mq) throws MQClientException { long result = -1 ; final ConsumeFromWhere consumeFromWhere = this .defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this .defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: case CONSUME_FROM_MIN_OFFSET: case CONSUME_FROM_MAX_OFFSET: case CONSUME_FROM_LAST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0 ) { result = lastOffset; } else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { result = 0L ; } else { try { result = this .mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { log.warn("Compute consume offset from last offset exception, mq={}, exception={}" , mq, e); throw e; } } } else { result = -1 ; } break ; } case CONSUME_FROM_FIRST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0 ) { result = lastOffset; } else if (-1 == lastOffset) { result = 0L ; } else { result = -1 ; } break ; } case CONSUME_FROM_TIMESTAMP: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0 ) { result = lastOffset; } else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { try { result = this .mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { log.warn("Compute consume offset from last offset exception, mq={}, exception={}" , mq, e); throw e; } } else { try { long timestamp = UtilAll.parseDate(this .defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime(); result = this .mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { log.warn("Compute consume offset from last offset exception, mq={}, exception={}" , mq, e); throw e; } } } else { result = -1 ; } break ; } default : break ; } return result; }
6.3.1 readOffset获取offset 该方法获取当前消费者组的offset,有三种读取类型:
1、 READ_FROM_MEMORY :仅从本地内存offsetTable读取;2、 READ_FROM_STORE :仅从broker中读取;3、 MEMORY_FIRST_THEN_STORE :先从本地内存offsetTable读取,读不到再从远程broker中读取; 当出现异常或者是在本地或者broker没有找到对于消费者组的offset记录,则算作第一次启动该消费者组,那么返回-1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 @Override public long readOffset (final MessageQueue mq, final ReadOffsetType type) { if (mq != null ) { switch (type) { case MEMORY_FIRST_THEN_STORE: case READ_FROM_MEMORY: { AtomicLong offset = this .offsetTable.get(mq); if (offset != null ) { return offset.get(); } else if (ReadOffsetType.READ_FROM_MEMORY == type) { return -1 ; } } case READ_FROM_STORE: { try { long brokerOffset = this .fetchConsumeOffsetFromBroker(mq); AtomicLong offset = new AtomicLong (brokerOffset); this .updateOffset(mq, offset.get(), false ); return brokerOffset; } catch (MQBrokerException e) { return -1 ; } catch (Exception e) { log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e); return -2 ; } } default : break ; } } return -1 ; }
6.3.1.1 fetchConsumeOffsetFromBroker从broker获取offset 该方法发起远程请求从broekr中获取只当topic的指定队列的指定消费者组的最新offset。请求Code为QUERY_CONSUMER_OFFSET。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private long fetchConsumeOffsetFromBroker (MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { FindBrokerResult findBrokerResult = this .mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true ); if (null == findBrokerResult) { this .mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this .mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false ); } if (findBrokerResult != null ) { QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader (); requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this .groupName); requestHeader.setQueueId(mq.getQueueId()); return this .mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5 ); } else { throw new MQClientException ("The broker[" + mq.getBrokerName() + "] not exist" , null ); } }
在broker端处理请求Code为QUERY_CONSUMER_OFFSET的方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override public RemotingCommand processRequest (ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.GET_CONSUMER_LIST_BY_GROUP: return this .getConsumerListByGroup(ctx, request); case RequestCode.UPDATE_CONSUMER_OFFSET: return this .updateConsumerOffset(ctx, request); case RequestCode.QUERY_CONSUMER_OFFSET: return this .queryConsumerOffset(ctx, request); default : break ; } return null ; }
6.3.1.2 queryConsumerOffset查询消费偏移量 queryConsumerOffset方法用于broker查询消费者偏移量。
1、 首先根据ConsumerGroup、Topic、QueueId是从broker端的offsetTable这个map集合缓存属性中获取缓存的消费偏移offset;2、 如果offset大于等于0,则直接返回,否则表示缓存中没有对应的消费记录,那么集训判断如果消费队列最新偏移量小于等于0,并且该消费队列的0偏移量数据还在内存中,表示为新消息队列并且消息未清理过,并且数据量不是很大此时,将offset设置为0并返回否则,将会设置QUERY_NOT_FOUND,最后被解析为-1;
*从该方法源码可以看出来,消费的consumeFromWhere设置可能不准确,例如一个新的topic里面有少量消息,此时新启动一个consumerGroup去消费,即使consumeFromWhere设置为CONSUME_FROM_LAST_OFFSET,仍然会从0(从头)开始消费。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 private RemotingCommand queryConsumerOffset (ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); final QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); final QueryConsumerOffsetRequestHeader requestHeader = (QueryConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); long offset = this .brokerController.getConsumerOffsetManager().queryOffset( requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); if (offset >= 0 ) { responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); response.setRemark(null ); } else { long minOffset = this .brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); if (minOffset <= 0 && !this .brokerController.getMessageStore().checkInDiskByConsumeOffset( requestHeader.getTopic(), requestHeader.getQueueId(), 0 )) { responseHeader.setOffset(0L ); response.setCode(ResponseCode.SUCCESS); response.setRemark(null ); } else { response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first" ); } } return response; }
6.4 dispatchPullRequest分发拉取消息请求PullRequest 该方法将因为本次新增的消息队列而创建的PullRequest请求进行分发处理。
1、 pull模式需要手动拉取消息,这些请求会作废,因此该方法是一个空实现 ;2、 **push模式下自动拉取消息,而这里的PullRequest就是对应的消息队列的第一个拉取请求,因此这些请求会被PullMessageService依次处理,后续实现自动拉取消息 **;
*这些PullRequest将会被存入PullMessageService服务内部的pullRequestQueue集合中,后续异步的消费,自动执行拉取消息的请求,这就是Push模式下最初的拉消息请求的来源。关于如何拉去消息以及如何消费,将是我们下一部分的内容。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void dispatchPullRequest (List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this .defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}" , consumerGroup, pullRequest); } }
7 messageQueueChanged更新消息队列 **如果processQueueTable发生了改变,那么调用messageQueueChanged方法。设置新的本地订阅关系版本,重设流控参数,立即给所有broker发送心跳,让Broker更新当前订阅关系。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 @Override public void messageQueueChanged (String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { SubscriptionData subscriptionData = this .subscriptionInner.get(topic); long newVersion = System.currentTimeMillis(); log.info("{} Rebalance changed, also update version: {}, {}" , topic, subscriptionData.getSubVersion(), newVersion); subscriptionData.setSubVersion(newVersion); int currentQueueCount = this .processQueueTable.size(); if (currentQueueCount != 0 ) { int pullThresholdForTopic = this .defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic(); if (pullThresholdForTopic != -1 ) { int newVal = Math.max(1 , pullThresholdForTopic / currentQueueCount); log.info("The pullThresholdForQueue is changed from {} to {}" , this .defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal); this .defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal); } int pullThresholdSizeForTopic = this .defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic(); if (pullThresholdSizeForTopic != -1 ) { int newVal = Math.max(1 , pullThresholdSizeForTopic / currentQueueCount); log.info("The pullThresholdSizeForQueue is changed from {} to {}" , this .defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal); this .defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal); } } this .getmQClientFactory().sendHeartbeatToAllBrokerWithLock(); }
8 总结 本次我们学习了DefaultMQPushConsumer负载均衡的具体步骤的源码。集群模式负载均衡的大概步骤为:
1、 首先获取该topic的所有消息队列集合mqSet,随后从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合cidAll一个clientId代表一个消费者;2、 对topic的消息队列和clientId集合分别进行排序排序能够保证,不同的客户端消费者在进行负载均衡时,其mqAll和cidAll中的元素顺序是一致的;3、 获取分配消息队列的策略实现AllocateMessageQueueStrategy,即负载均衡的策略类,执行allocate方法,为当前clientId也就是当前消费者,分配消息队列,这一步就是执行负载均衡或者说重平衡的算法;4、 调用updateProcessQueueTableInRebalance方法,更新新分配的消息队列的处理队列processQueueTable的信息,为新分配的消息队列创建最初的pullRequest并分发给PullMessageService这就是Push模式下最初的拉消息请求的来源 ;5、 如果processQueueTable发生了改变,那么调用messageQueueChanged方法设置新的本地订阅关系版本,重设流控参数,立即给所有broker发送心跳,让Broker更新当前订阅关系;
*同时我们也知道了,最初始的PullRequest,就是在负载均衡之时对于新分配到的消费队列创建的。然后通过dispatchPullRequest方法对这些PullRequest进行分发,Push模式下这些请求会被PullMessageService依次处理,后续实现自动拉取消息,以及消费。 **
*这些PullRequest将会被存入PullMessageService服务内部的pullRequestQueue集合中,后续异步的消费,自动执行拉取消息的请求,这就是Push模式下最初的拉消息请求的来源。关于如何拉去消息以及如何消费,将是我们下一部分的内容。 **
__END__