25、RocketMQ源码分析:DefaultMQPushConsumer消费进度管理源码
分类:RocketMQ源码分析(1) 2024-03-27 阅读(160)
DefaultMQPushConsumer的消费进度由OffsetStore这个类提供统一的API来进行管理。集群模式使用RemoteBrokerOffsetStore实现类,广播模式使用LocalFileOffsetStore实现类。
文章目录
- 1 load启动加载消费偏移量
- 2 readOffset读取offset
- 3 updateOffset消费完成更新内存offset
- 4 persistAllConsumerOffset定时持久化offset
- 5 persistAll持久化所有offset
- 5.1 updateConsumeOffsetToBroker上报offset到Broker
- 5.2 broker处理更新offset请求
1 load启动加载消费偏移量
消费者启动时,DefaultMQPushConsumer构造器中的start方法内部会调用Offset#load方法初始化消费偏移量。
LocalFileOffsetStore会加载本地磁盘中的数据,RemoteBrokerOffsetStore则是一个空实现。
LocalFileOffsetStore的load方法,从本地文件恢复offset配置,地址为{user.home}/.rocketmq_offsets/{clientId}/{groupName}/offsets.json,配置在文件中以json形式存在。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
@Override public void load() throws MQClientException { OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
for (Entry<MessageQueue, AtomicLong> mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) { AtomicLong offset = mqEntry.getValue(); log.info("load consumer's offset, {} {} {}", this.groupName, mqEntry.getKey(), offset.get()); } } }
|
RemoteBrokerOffsetStore的load方法则是一个空实现。
1 2 3 4 5 6 7 8
|
@Override public void load() { }
|
2 readOffset读取offset
负载均衡分配到新的消息队列时需要获取最新offset,以及集群模式拉取消息时都需要获取最新offset上报给broekr。
该方法获取当前消费者组的offset,有三种读取类型:
1、 READ_FROM_MEMORY:仅从本地内存offsetTable读取;
2、 READ_FROM_STORE:仅从存储服务中读取,可能是本地文件或者broker中读取;
3、 MEMORY_FIRST_THEN_STORE:先从本地内存offsetTable读取,读不到再从存储服务中读取;
当出现异常或者是在本地或者broker没有找到对于消费者组的offset记录,则算作第一次启动该消费者组,那么返回-1。
RemoteBrokerOffsetStore的offset存储服务是broker,因此READ_FROM_STORE就是从broker中读取,该方法的源码我们在DefaultMQPushConsumer负载均衡服务部分就讲过了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
@Override public long readOffset(final MessageQueue mq, final ReadOffsetType type) { if (mq != null) { switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: { AtomicLong offset = this.offsetTable.get(mq); if (offset != null) { return offset.get(); } else if (ReadOffsetType.READ_FROM_MEMORY == type) { return -1; } }
case READ_FROM_STORE: { try {
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq); AtomicLong offset = new AtomicLong(brokerOffset); this.updateOffset(mq, offset.get(), false); return brokerOffset; } catch (MQBrokerException e) { return -1; } catch (Exception e) { log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e); return -2; } } default: break; } }
return -1; }
|
LocalFileOffsetStore的offset存储服务是本地文件,因此READ_FROM_STORE就是从本地文件中读取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
|
@Override public long readOffset(final MessageQueue mq, final ReadOffsetType type) { if (mq != null) { switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: { AtomicLong offset = this.offsetTable.get(mq); if (offset != null) { return offset.get(); } else if (ReadOffsetType.READ_FROM_MEMORY == type) { return -1; } }
case READ_FROM_STORE: { OffsetSerializeWrapper offsetSerializeWrapper; try { offsetSerializeWrapper = this.readLocalOffset(); } catch (MQClientException e) { return -1; } if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); if (offset != null) { this.updateOffset(mq, offset.get(), false); return offset.get(); } } } default: break; } }
return -1; }
|
3 updateOffset消费完成更新内存offset
消费者在成功之后将会调用该方法更新内存中的offsetTable的最新offset,RemoteBrokerOffsetStore和LocalFileOffsetStore方法的源码是一致的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
@Override public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { AtomicLong offsetOld = this.offsetTable.get(mq); if (null == offsetOld) { offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); } if (null != offsetOld) { if (increaseOnly) { MixAll.compareAndIncreaseOnly(offsetOld, offset); } else { offsetOld.set(offset); } } } }
|
4 persistAllConsumerOffset定时持久化offset
消费者除了在拉取消息的时候,会上报上一次的消费点位进行持久化(集群模式),同时在Consumer启动过程中也会启动一个定时任务,每5秒钟进行一次offset的持久化(广播模式和集群模式)。
Consumer的启动过程中,在MQClientInstance的startScheduledTask方法中会去启动各种定时延迟任务,其中一个定时任务,会每5秒钟进行一次offset的持久化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
|
persistAllConsumerOffset用于持久化所有consumer的offset。集群模式持久化到broker,广播模式持久化到本地。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
private void persistAllConsumerOffset() { Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); impl.persistConsumerOffset(); } }
|
persistConsumerOffset方法获取所有的mq集合,然后调用offsetStore#persistAll方法,持久化所有mq的offset到本地文件或者远程broker。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
@Override public void persistConsumerOffset() { try { this.makeSureStateOK(); Set<MessageQueue> mqs = new HashSet<MessageQueue>(); Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); mqs.addAll(allocateMq); this.offsetStore.persistAll(mqs); } catch (Exception e) { log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); } }
|
5 persistAll持久化所有offset
该方法持久化所有mq的offset到本地文件或者远程broker。
RemoteBrokerOffsetStore方法,持久化所有mq的offset到远程broker。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
@Override public void persistAll(Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) return; final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>(); for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { MessageQueue mq = entry.getKey(); AtomicLong offset = entry.getValue(); if (offset != null) { if (mqs.contains(mq)) { try {
this.updateConsumeOffsetToBroker(mq, offset.get()); log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", this.groupName, this.mQClientFactory.getClientId(), mq, offset.get()); } catch (Exception e) { log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } } else { unusedMQ.add(mq); } } } if (!unusedMQ.isEmpty()) { for (MessageQueue mq : unusedMQ) { this.offsetTable.remove(mq); log.info("remove unused mq, {}, {}", mq, this.groupName); } } }
|
5.1 updateConsumeOffsetToBroker上报offset到Broker
该方法向master的broker发送一个更新offset的请求,请求Code为UPDATE_CONSUMER_OFFSET。这是一个单向请求,即发送之后马上返回,不管broker是否真正的更新成功,可能导致重复消费。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { updateConsumeOffsetToBroker(mq, offset, true); }
@Override public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false); }
if (findBrokerResult != null) { UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset); if (isOneway) { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } else { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } } else { throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } }
public void updateConsumerOffsetOneway( final String addr, final UpdateConsumerOffsetRequestHeader requestHeader, final long timeoutMillis ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); }
|
5.2 broker处理更新offset请求
broker接收到请求Code为UPDATE_CONSUMER_OFFSET的请求之后,将会进行offset更新。该请求的处理器是ConsumerManageProcessor。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.GET_CONSUMER_LIST_BY_GROUP: return this.getConsumerListByGroup(ctx, request); case RequestCode.UPDATE_CONSUMER_OFFSET: return this.updateConsumerOffset(ctx, request); case RequestCode.QUERY_CONSUMER_OFFSET: return this.queryConsumerOffset(ctx, request); default: break; } return null; }
|
这里提交偏移量实际上就是将新的偏移量存入ConsumerOffsetManager内部的offsetTable中。该缓存对应着磁盘上的{user.home}/store/config/consumerOffset.json文件。这里实际上是存入到内存中的,并没有持久化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) { String key = topic + TOPIC_GROUP_SEPARATOR + group; this.commitOffset(clientHost, key, queueId, offset); }
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); if (null == map) { map = new ConcurrentHashMap<Integer, Long>(32); map.put(queueId, offset); this.offsetTable.put(key, map); } else { Long storeOffset = map.put(queueId, offset); if (storeOffset != null && offset < storeOffset) { log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); } } }
|
broker启动过程中,在BrokerController#initialize方法中会启动一些定时调度任务,其中有一个任务每隔5s将消费者offset进行持久化(offsetTable中的数据),存入consumerOffset.json文件中。

broker在shutdown的时候也会调用consumerOffsetManager#persist()持久化offset到consumerOffset.json文件中。
__END__