15、RocketMQ源码分析:消费者DefaultMQPushConsumer启动主要流程源码 此前我们学习了Producer和Broker的启动源码,以及Producer发送消息源码和Broker接收存储消息的源码,现在,我们来学习Consumer的启动以及消费消息的源码。Consumer的启动源码和Producer的启动源码还是有很多相似的地方的。
文章目录
1 创建DefaultMQPushConsumer实例
2 subscribe订阅
3 start启动消费者
3.1 copySubscription拷贝订阅关系
4 小结
客户端常用的消费者类是DefaultMQPushConsumer,此类的简单消费者案例如下,在RocketMQ源码的example模块下的找到更多快速案例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class Consumer { public static void main (String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("ConsumerStart" ); consumer.setNamesrvAddr("127.0.0.1:9876" ); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("ConsumerStart" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n" , Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n" ); } }
我们本次分析RocketMQ消费者启动的源码。实际上就是分析DefaultMQPushConsumer的构造器以及start方法的源码。
1 创建DefaultMQPushConsumer实例 DefaultMQPushConsumer的构造器有很多,但最终都是调用下面四个参数的构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public DefaultMQPushConsumer (final String namespace, final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this .consumerGroup = consumerGroup; this .namespace = namespace; this .allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl (this , rpcHook); }
这个构造器是指定了命名空间、生产者组、RPC钩子和消费者之间消息分配的策略算法的构造器,其内部创建了一个DefaultMQPushConsumerImpl实例,DefaultMQPushConsumer可以看作是DefaultMQPushConsumerImpl的包装类,开放给开发人员使用,DefaultMQPushConsumer中的几乎所有的方法内部都是由DefaultMQPushConsumerImpl实现的。这是门面模式设计模式。
下面是DefaultMQPushConsumerImpl的构造器,也很简单。
1 2 3 4 5 6 7 8 9 public DefaultMQPushConsumerImpl (DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { this .defaultMQPushConsumer = defaultMQPushConsumer; this .rpcHook = rpcHook; this .pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException(); }
创建了DefaultMQPushConsumer实例之后,会设置一些属性,包括namesrvAddr、consumeFromWhere、注册messageListener消息监听器等等,这些都见简单的属性赋值操作,除了subscribe方法。
2 subscribe订阅 subscribe方法表示Consumer订阅的自己感兴趣的Topic,并且支持对消息进行过滤,,过滤表达式支持TAG和SQL92两种类型,他们都会被解析成SubscriptionData对象,最终将topic与SubscriptionData的关系维护到RebalanceImpl内部的subscriptionInner这个map集合中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void subscribe (String topic, String subExpression) throws MQClientException { this .defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression); }
下面是DefaultMQPushConsumerImpl的方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public void subscribe (String topic, String subExpression) throws MQClientException { try { SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression); this .rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this .mQClientFactory != null ) { this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } } catch (Exception e) { throw new MQClientException ("subscription exception" , e); } }
3 start启动消费者 DefaultMQPushConsumer的构造器实际上没做什么太多的操作,主要是start方法内部会执行很多初始化操作,因此使用时,我们需要在消费或者查询消息之前调用该方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void start () throws MQClientException { setConsumerGroup(NamespaceUtil.wrapNamespace(this .getNamespace(), this .consumerGroup)); this .defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this .getNamesrvAddr(), this .getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed " , e); } } }
主要是defaultMQPushConsumerImpl#start方法,该方法实现生产者的启动。主要步骤有如下几步:
1、 调用checkConfig方法检查消费者的配置信息,如果consumerGroup为空,或者长度大于255个字符,或者包含非法字符(正常的匹配模式为^[%|a-zA-Z0-9_-]+$
),或者消费者组名为默认组名DEFAULT_CONSUMER,或者messageModel为空,或者consumeFromWhere为空,或者consumeTimestamp为空,或者allocateMessageQueueStrategy为空……等等属性的空校验,满足以上任意条件都校验不通过抛出异常;2、 调用copySubscription 方法,拷贝拷贝订阅关系,然后为集群消费模式的消费者,配置其对应的重试主题retryTopic=%RETRY%+consumerGroup并且设置当前消费者自动订阅该消费者组对应的重试topic,用于实现消费重试;3、 调用getOrCreateMQClientInstance方法,然后根据clientId获取或者创建CreateMQClientInstance实例,并赋给mQClientFactory变量该方法我们在生产者启动源码部分已经讲过了;4、 设置负载均衡服务rebalanceImpl 的相关属性;5、 创建消息拉取核心对象PullAPIWrapper ,封装了消息拉取及结果解析逻辑的API;6、 根据消息模式设置不同的OffsetStore ,用于实现消费者的消息消费偏移量offset的管理如果是广播消费模式,则是LocalFileOffsetStore,消息消费进度即offset存储在本地磁盘中如果是集群消费模式,则是RemoteBrokerOffsetStore,消息消费进度即offset存储在远程broker中;7、 调用offsetStore.load 加载消费偏移量,LocalFileOffsetStore会加载本地磁盘中的数据,RemoteBrokerOffsetStore则是一个空实现;8、 根据消息监听器MessageListener 的类型创建不同的消息消费服务ConsumeMessageService如果是MessageListenerOrderly类型,则表示顺序消费,创建ConsumeMessageOrderlyService如果是MessageListenerConcurrently类型,则表示并发消费,创建ConsumeMessageOrderlyService;9、 调用consumeMessageService.start 启动消息消费服务消息拉取服务PullMessageService拉取到消息后,会构建ConsumeRequest对象交给consumeMessageService去消费;10、 注册消费者组和消费者到MQClientInstance 中的consumerTable 中,如果没注册成功,那么可能是因为同一个程序中存在同名消费者组的不同消费者,抛出异常;
调用mQClientFactory#start 方法启动CreateMQClientInstance客户端通信实例,初始化netty服务、各种定时任务、拉取消息服务、rebalanceService服务等等。CreateMQClientInstance仅会被初始化一次,其源码我们在生产者启动源码部分已经讲过了。
12、 进行后续处理:;
1、 调用updateTopicSubscribeInfoWhenSubscriptionChanged 方法,向NameServer拉取并更新当前消费者订阅的topic路由信息;2、 调用checkClientInBroker 方法,随机选择一个Broker,发送检查客户端tag配置的请求,主要是检测Broker是否支持SQL92类型的tag过滤以及SQL92的tag语法是否正确;3、 调用sendHeartbeatToAllBrokerWithLock 方法,主动发送心跳信息给所有brokerBroker接收到心跳后,会发送Code为NOTIFY_CONSUMER_IDS_CHANGED的请求给Group下其它消费者,要求它们重新进行负载均衡;4、 调用rebalanceImmediately 方法,唤醒负载均衡服务rebalanceService,主动进行一次MessageQueue的重平衡;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 public synchronized void start () throws MQClientException { switch (this .serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}" , this .defaultMQPushConsumer.getConsumerGroup(), this .defaultMQPushConsumer.getMessageModel(), this .defaultMQPushConsumer.isUnitMode()); this .serviceState = ServiceState.START_FAILED; this .checkConfig(); this .copySubscription(); if (this .defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this .defaultMQPushConsumer.changeInstanceNameToPID(); } this .mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this .defaultMQPushConsumer, this .rpcHook); this .rebalanceImpl.setConsumerGroup(this .defaultMQPushConsumer.getConsumerGroup()); this .rebalanceImpl.setMessageModel(this .defaultMQPushConsumer.getMessageModel()); this .rebalanceImpl.setAllocateMessageQueueStrategy(this .defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this .rebalanceImpl.setmQClientFactory(this .mQClientFactory); this .pullAPIWrapper = new PullAPIWrapper ( mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this .defaultMQPushConsumer.getOffsetStore() != null ) { this .offsetStore = this .defaultMQPushConsumer.getOffsetStore(); } else { switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this .offsetStore = new LocalFileOffsetStore (this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; case CLUSTERING: this .offsetStore = new RemoteBrokerOffsetStore (this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; default : break ; } this .defaultMQPushConsumer.setOffsetStore(this .offsetStore); } this .offsetStore.load(); if (this .getMessageListenerInner() instanceof MessageListenerOrderly) { this .consumeOrderly = true ; this .consumeMessageService = new ConsumeMessageOrderlyService (this , (MessageListenerOrderly) this .getMessageListenerInner()); } else if (this .getMessageListenerInner() instanceof MessageListenerConcurrently) { this .consumeOrderly = false ; this .consumeMessageService = new ConsumeMessageConcurrentlyService (this , (MessageListenerConcurrently) this .getMessageListenerInner()); } this .consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this .defaultMQPushConsumer.getConsumerGroup(), this ); if (!registerOK) { this .serviceState = ServiceState.CREATE_JUST; this .consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException ("The consumer group[" + this .defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null ); } mQClientFactory.start(); log.info("the consumer [{}] start OK." , this .defaultMQPushConsumer.getConsumerGroup()); this .serviceState = ServiceState.RUNNING; break ; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException ("The PushConsumer service state not OK, maybe started once, " + this .serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null ); default : break ; } this .updateTopicSubscribeInfoWhenSubscriptionChanged(); this .mQClientFactory.checkClientInBroker(); this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this .mQClientFactory.rebalanceImmediately(); }
3.1 copySubscription拷贝订阅关系 该方法将defaultMQPushConsumer中的订阅关系Map集合subscription中的数据拷贝到RebalanceImpl的subscriptionInner中。
**然后还有很重要的一步,就是为集群消费模式的消费者,配置其对应的重试主题 retryTopic = %RETRY% + consumerGroup,并且设置当前消费者自动订阅该消费者组对应的重试topic,用于实现消费重试。而如果是广播消费模式,那么不订阅重试topic,所以说,从Consumer启动的时候开始,就注定了广播消费模式的消费者,消费失败消息会丢弃,无法重试。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 private void copySubscription () throws MQClientException { try { Map<String, String> sub = this .defaultMQPushConsumer.getSubscription(); if (sub != null ) { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString); this .rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } if (null == this .messageListenerInner) { this .messageListenerInner = this .defaultMQPushConsumer.getMessageListener(); } switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break ; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this .defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL); this .rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break ; default : break ; } } catch (Exception e) { throw new MQClientException ("subscription exception" , e); } }
4 小结 本次我们仅仅介绍了Consumer消费者启动的主要流程,后面我们单独分析这些服务的工作原理。其中几个关键服务如下:
1、 rebalanceService :消费者负载均衡服务,用于确定消费者的消息队列以及负载均衡,同时也是触发pullMessageService拉取消息的入口由MQClientInstance启动,同一个服务器的所有Consumer使用同一个实例;2、 pullMessageService :消息拉取服务,用于拉取消息由MQClientInstance启动,同一个服务器的所有Consumer使用同一个实例;3、 consumeMessageService :消息消费服务,消息拉取服务拉取到消息后,交给此服务消费消息由DefaultMQPushConsumerImpl启动,每个Consumer持有一个实例;4、 OffsetStore :用于管理消费点位的上报持久化由DefaultMQPushConsumerImpl启动,每个Consumer持有一个实例;
__END__