15、RocketMQ源码分析:消费者DefaultMQPushConsumer启动主要流程源码 此前我们学习了Producer和Broker的启动源码,以及Producer发送消息源码和Broker接收存储消息的源码,现在,我们来学习Consumer的启动以及消费消息的源码。Consumer的启动源码和Producer的启动源码还是有很多相似的地方的。
文章目录
1 创建DefaultMQPushConsumer实例
2 subscribe订阅
3 start启动消费者
3.1 copySubscription拷贝订阅关系
4 小结
客户端常用的消费者类是DefaultMQPushConsumer,此类的简单消费者案例如下,在RocketMQ源码的example模块下的找到更多快速案例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class Consumer { public static void main (String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("ConsumerStart" ); consumer.setNamesrvAddr("127.0.0.1:9876" ); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("ConsumerStart" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n" , Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n" ); } }
我们本次分析RocketMQ消费者启动的源码。实际上就是分析DefaultMQPushConsumer的构造器以及start方法的源码。
1 创建DefaultMQPushConsumer实例 DefaultMQPushConsumer的构造器有很多,但最终都是调用下面四个参数的构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public DefaultMQPushConsumer (final String namespace, final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this .consumerGroup = consumerGroup; this .namespace = namespace; this .allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl (this , rpcHook); }
这个构造器是指定了命名空间、生产者组、RPC钩子和消费者之间消息分配的策略算法的构造器,其内部创建了一个DefaultMQPushConsumerImpl实例,DefaultMQPushConsumer可以看作是DefaultMQPushConsumerImpl的包装类,开放给开发人员使用,DefaultMQPushConsumer中的几乎所有的方法内部都是由DefaultMQPushConsumerImpl实现的。这是门面模式设计模式。
下面是DefaultMQPushConsumerImpl的构造器,也很简单。
1 2 3 4 5 6 7 8 9 public DefaultMQPushConsumerImpl (DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { this .defaultMQPushConsumer = defaultMQPushConsumer; this .rpcHook = rpcHook; this .pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException(); }
创建了DefaultMQPushConsumer实例之后,会设置一些属性,包括namesrvAddr、consumeFromWhere、注册messageListener消息监听器等等,这些都见简单的属性赋值操作,除了subscribe方法。
2 subscribe订阅 subscribe方法表示Consumer订阅的自己感兴趣的Topic,并且支持对消息进行过滤,,过滤表达式支持TAG和SQL92两种类型,他们都会被解析成SubscriptionData对象,最终将topic与SubscriptionData的关系维护到RebalanceImpl内部的subscriptionInner这个map集合中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void subscribe (String topic, String subExpression) throws MQClientException { this .defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression); }
下面是DefaultMQPushConsumerImpl的方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public void subscribe (String topic, String subExpression) throws MQClientException { try { SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression); this .rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this .mQClientFactory != null ) { this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } } catch (Exception e) { throw new MQClientException ("subscription exception" , e); } }
3 start启动消费者 DefaultMQPushConsumer的构造器实际上没做什么太多的操作,主要是start方法内部会执行很多初始化操作,因此使用时,我们需要在消费或者查询消息之前调用该方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void start () throws MQClientException { setConsumerGroup(NamespaceUtil.wrapNamespace(this .getNamespace(), this .consumerGroup)); this .defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this .getNamesrvAddr(), this .getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed " , e); } } }
主要是defaultMQPushConsumerImpl#start方法,该方法实现生产者的启动。主要步骤有如下几步:
1、 调用checkConfig方法检查消费者的配置信息,如果consumerGroup为空,或者长度大于255个字符,或者包含非法字符(正常的匹配模式为^[%|a-zA-Z0-9_-]+$
),或者消费者组名为默认组名DEFAULT_CONSUMER,或者messageModel为空,或者consumeFromWhere为空,或者consumeTimestamp为空,或者allocateMessageQueueStrategy为空……等等属性的空校验,满足以上任意条件都校验不通过抛出异常;2、 调用copySubscription 方法,拷贝拷贝订阅关系,然后为集群消费模式的消费者,配置其对应的重试主题retryTopic=%RETRY%+consumerGroup并且设置当前消费者自动订阅该消费者组对应的重试topic,用于实现消费重试;3、 调用getOrCreateMQClientInstance方法,然后根据clientId获取或者创建CreateMQClientInstance实例,并赋给mQClientFactory变量该方法我们在生产者启动源码部分已经讲过了;4、 设置负载均衡服务rebalanceImpl 的相关属性;5、 创建消息拉取核心对象PullAPIWrapper ,封装了消息拉取及结果解析逻辑的API;6、 根据消息模式设置不同的OffsetStore ,用于实现消费者的消息消费偏移量offset的管理如果是广播消费模式,则是LocalFileOffsetStore,消息消费进度即offset存储在本地磁盘中如果是集群消费模式,则是RemoteBrokerOffsetStore,消息消费进度即offset存储在远程broker中;7、 调用offsetStore.load 加载消费偏移量,LocalFileOffsetStore会加载本地磁盘中的数据,RemoteBrokerOffsetStore则是一个空实现;8、 根据消息监听器MessageListener 的类型创建不同的消息消费服务ConsumeMessageService如果是MessageListenerOrderly类型,则表示顺序消费,创建ConsumeMessageOrderlyService如果是MessageListenerConcurrently类型,则表示并发消费,创建ConsumeMessageOrderlyService;9、 调用consumeMessageService.start 启动消息消费服务消息拉取服务PullMessageService拉取到消息后,会构建ConsumeRequest对象交给consumeMessageService去消费;10、 注册消费者组和消费者到MQClientInstance 中的consumerTable 中,如果没注册成功,那么可能是因为同一个程序中存在同名消费者组的不同消费者,抛出异常;
调用mQClientFactory#start 方法启动CreateMQClientInstance客户端通信实例,初始化netty服务、各种定时任务、拉取消息服务、rebalanceService服务等等。CreateMQClientInstance仅会被初始化一次,其源码我们在生产者启动源码部分已经讲过了。
12、 进行后续处理:;
1、 调用updateTopicSubscribeInfoWhenSubscriptionChanged 方法,向NameServer拉取并更新当前消费者订阅的topic路由信息;2、 调用checkClientInBroker 方法,随机选择一个Broker,发送检查客户端tag配置的请求,主要是检测Broker是否支持SQL92类型的tag过滤以及SQL92的tag语法是否正确;3、 调用sendHeartbeatToAllBrokerWithLock 方法,主动发送心跳信息给所有brokerBroker接收到心跳后,会发送Code为NOTIFY_CONSUMER_IDS_CHANGED的请求给Group下其它消费者,要求它们重新进行负载均衡;4、 调用rebalanceImmediately 方法,唤醒负载均衡服务rebalanceService,主动进行一次MessageQueue的重平衡;
public synchronized void start () throws MQClientException { switch (this .serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}" , this .defaultMQPushConsumer.getConsumerGroup(), this .defaultMQPushConsumer.getMessageModel(), this .defaultMQPushConsumer.isUnitMode()); this .serviceState = ServiceState.START_FAILED; this .checkConfig(); this .copySubscription(); if (this .defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this .defaultMQPushConsumer.changeInstanceNameToPID(); } this .mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this .defaultMQPushConsumer, this .rpcHook); this .rebalanceImpl.setConsumerGroup(this .defaultMQPushConsumer.getConsumerGroup()); this .rebalanceImpl.setMessageModel(this .defaultMQPushConsumer.getMessageModel()); this .rebalanceImpl.setAllocateMessageQueueStrategy(this .defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this .rebalanceImpl.setmQClientFactory(this .mQClientFactory); this .pullAPIWrapper = new PullAPIWrapper ( mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this .defaultMQPushConsumer.getOffsetStore() != null ) { this .offsetStore = this .defaultMQPushConsumer.getOffsetStore(); } else { switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this .offsetStore = new LocalFileOffsetStore (this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; case CLUSTERING: this .offsetStore = new RemoteBrokerOffsetStore (this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; default : break ; } this .defaultMQPushConsumer.setOffsetStore(this .offsetStore); } this .offsetStore.load(); if (this .getMessageListenerInner() instanceof MessageListenerOrderly) { this .consumeOrderly = true ; this .consumeMessageService = new ConsumeMessageOrderlyService (this , (MessageListenerOrderly) this .getMessageListenerInner()); } else if (this .getMessageListenerInner() instanceof MessageListenerConcurrently) { this .consumeOrderly = false ; this .consumeMessageService = new ConsumeMessageConcurrentlyService (this , (MessageListenerConcurrently) this .getMessageListenerInner()); } this .consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this .defaultMQPushConsumer.getConsumerGroup(), this ); if (!registerOK) { this .serviceState = ServiceState.CREATE_JUST; this .consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException ("The consumer group[" + this .defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null ); } mQClientFactory.start(); log.info("the consumer [{}] start OK." , this .defaultMQPushConsumer.getConsumerGroup()); this .serviceState = ServiceState.RUNNING; break ; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException ("The PushConsumer service state not OK, maybe started once, " + this .serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null ); default : break ; } this .updateTopicSubscribeInfoWhenSubscriptionChanged(); this .mQClientFactory.checkClientInBroker(); this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this .mQClientFactory.rebalanceImmediately(); }
3.1 copySubscription拷贝订阅关系 该方法将defaultMQPushConsumer中的订阅关系Map集合subscription中的数据拷贝到RebalanceImpl的subscriptionInner中。
**然后还有很重要的一步,就是为集群消费模式的消费者,配置其对应的重试主题 retryTopic = %RETRY% + consumerGroup,并且设置当前消费者自动订阅该消费者组对应的重试topic,用于实现消费重试。而如果是广播消费模式,那么不订阅重试topic,所以说,从Consumer启动的时候开始,就注定了广播消费模式的消费者,消费失败消息会丢弃,无法重试。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 private void copySubscription () throws MQClientException { try { Map<String, String> sub = this .defaultMQPushConsumer.getSubscription(); if (sub != null ) { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString); this .rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } if (null == this .messageListenerInner) { this .messageListenerInner = this .defaultMQPushConsumer.getMessageListener(); } switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break ; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this .defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL); this .rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break ; default : break ; } } catch (Exception e) { throw new MQClientException ("subscription exception" , e); } }
4 小结 本次我们仅仅介绍了Consumer消费者启动的主要流程,后面我们单独分析这些服务的工作原理。其中几个关键服务如下:
1、 rebalanceService :消费者负载均衡服务,用于确定消费者的消息队列以及负载均衡,同时也是触发pullMessageService拉取消息的入口由MQClientInstance启动,同一个服务器的所有Consumer使用同一个实例;2、 pullMessageService :消息拉取服务,用于拉取消息由MQClientInstance启动,同一个服务器的所有Consumer使用同一个实例;3、 consumeMessageService :消息消费服务,消息拉取服务拉取到消息后,交给此服务消费消息由DefaultMQPushConsumerImpl启动,每个Consumer持有一个实例;4、 OffsetStore :用于管理消费点位的上报持久化由DefaultMQPushConsumerImpl启动,每个Consumer持有一个实例;
__END__