21、RocketMQ源码分析:ConsumeMessageConcurrentlyService并发消费消息源码
此前我们学习了consumer消息的拉取流程源码:
**1、
** RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码;
**2、
** RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码【一万字】;
**3、
** RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码;
当前DefaultMQPushConsumer拉取到消息之后,会将消息提交到对应的processQueue处理队列内部的msgTreeMap中。然后通过consumeMessageService#submitConsumeRequest方法将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池消费消息。
consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现,下面我们来看看这两种实现如何消费消息,本次我们先学习ConsumeMessageConcurrentlyService并发消费的源码。
文章目录
- 1 start启动服务定时清理过期消息
-
- 1.2cleanExpiredMsg清理过期消息
- 2 submitConsumeRequest提交消费请求
- 2.2 submitConsumeRequestLater延迟提交
- 2.2 consumeMessageBatchMaxSize和pullBatchSize
- 3 ConsumeRequest执行消费任务
- 3.1 resetRetryAndNamespace重设重试topic
- 4 processConsumeResult处理消费结果
-
- 4.2 updateOffset更新offset
- 4.2.1 compareAndIncreaseOnly仅增加offset
1 start启动服务定时清理过期消息
consumeMessageService服务在DefaultMQPushConsumerImpl#start方法中被初始化并启动,即调用start方法。
- *ConsumeMessageConcurrentlyService#start方法将会通过cleanExpireMsgExecutors定时任务清理过期的消息,启动后15min开始执行,后每15min执行一次,这里的15min是RocketMQ大的默认超时时间,可通过defaultMQPushConsumer#consumeTimeout属性设置。
**
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
public void start() { this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { cleanExpireMsg(); } catch (Throwable e) { log.error("scheduleAtFixedRate cleanExpireMsg exception", e); } }
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); }
|
1.1 cleanExpireMsg清理过期消息
该方法获取所有的消息队列和处理队列的键值对,循环遍历并且调用ProcessQueue#cleanExpiredMsg方法清理过期消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
private void cleanExpireMsg() { Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); while (it.hasNext()) { Map.Entry<MessageQueue, ProcessQueue> next = it.next(); ProcessQueue pq = next.getValue(); pq.cleanExpiredMsg(this.defaultMQPushConsumer); } }
|
1.2cleanExpiredMsg清理过期消息
循环清理msgTreeMap中的过期消息,每次最多循环清理16条消息。
1、 每次循环首先获取msgTreeMap中的第一次元素的起始消费时间,msgTreeMap是一个红黑树,第一个节点就是offset最小的节点;
2、 如果消费时间距离现在时间超过默认15min,那么获取这个msg,如果没有被消费,或者消费时间距离现在时间不超过默认15min,则结束循环;
3、 将获取到的消息通过sendMessageBack发回broker延迟topic,将在给定延迟时间(默认从level3,即10s开始)之后发回进行重试消费;
4、 加锁判断如果这个消息还没有被消费完,并且还是在第一位,那么调用removeMessage方法从msgTreeMap中移除消息,进行下一轮判断;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { return; } int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; for (int i = 0; i < loop; i++) { MessageExt msg = null; try { this.treeMapLock.readLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty()) { String consumeStartTimeStamp = MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue()); if (StringUtils.isNotEmpty(consumeStartTimeStamp) && System.currentTimeMillis() - Long.parseLong(consumeStartTimeStamp) > pushConsumer.getConsumeTimeout() * 60 * 1000) { msg = msgTreeMap.firstEntry().getValue(); } else { break; } } else { break; } } finally { this.treeMapLock.readLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); }
try { pushConsumer.sendMessageBack(msg, 3); log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); try { this.treeMapLock.writeLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { try { removeMessage(Collections.singletonList(msg)); } catch (Exception e) { log.error("send expired msg exception", e); } } } finally { this.treeMapLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); } } catch (Exception e) { log.error("send expired msg exception", e); } } }
|
2 submitConsumeRequest提交消费请求
- *该方法将消息批量的封装为ConsumeRequest提交到ConsumeMessageConcurrentlyService内部的consumeExecutor线程池中进行异步消费,如果提交失败,则调用submitConsumeRequestLater方法延迟5s进行提交,而不是丢弃。
**
1、 首先获取单次批量消费的数量,默认1,通过DefaultMQPushConsumer的consumeMessageBatchMaxSize属性配置;
2、 如果消息数量<=单次批量消费的数量,那么直接全量消费,构建一个ConsumeRequest并提交到consumeExecutor线程池;
**3、** 如果消息数量>单次批量消费的数量,那么需要分割消息进行分批提交;
**从该方法可以得知,对于并发消费模式,拉取到的一批消息被分批次提交到线程池之后,就由线程池里面的线程异步的消费,我们知道线程池里面的线程执行先后顺序时不可控制的,因此这些不同批次的消息会被并发、的无序的消费。
**
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
@Override public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } }
else { for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }
|
consumeExecutor线程池用于消费消息,其定义如下:最小、最大线程数默认20,阻塞队列为无界阻塞队列LinkedBlockingQueue。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl(consumeThreadPrefix));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
|
2.2 submitConsumeRequestLater延迟提交
提交的任务被线程池拒绝,那么延迟5s进行提交,而不是丢弃。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
private void submitConsumeRequestLater(final ConsumeRequest consumeRequest ) { this.scheduledExecutorService.schedule(new Runnable() {
@Override public void run() { ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest); } }, 5000, TimeUnit.MILLISECONDS); }
|
2.2 consumeMessageBatchMaxSize和pullBatchSize
co*
nsumeMessageBatchMaxSize是什么意思呢?他的字面意思就是单次批量消费的数量,实际上它代表着每次发送给消息监听器MessageListenerOrderly或者MessageListenerConcurrently的consumeMessage方法中的参数List
msgs中的最多的消息数量。*
- *consumeMessageBatchMaxSize默认值为1,所以说,无论是并发消费还是顺序消费,每次的consumeMessage方法的执行,msgs集合默认都只有一条消息。同理,如果把它设置为其他值n,无论是并发消费还是顺序消费,每次的consumeMessage的执行,msgs集合默认都最多只有n条消息。
**
**另外,在此前拉取消息的源码中,我们还学习了另一个参数pullBatchSize,默认值为32,其代表的是每一次拉取请求最多批量拉取的消费数量。也就是说无论是并发消费还是顺序消费,每次最多拉取32条消息。
**
3 ConsumeRequest执行消费任务
ConsumeRequest本身是一个线程任务,当拉取到消息之后,会将一批消息构建为一个ConsumeRequest对象,提交给consumeExecutor,由线程池异步的执行,它的run方法就是并发消费的核心方法。大概逻辑为:
1、 如果处理队列被丢弃,即dropped=true,那么直接返回,不再消费,例如负载均衡时该队列被分配给了其他新上线的消费者,尽量避免重复消费;
2、 调用resetRetryAndNamespace方法,当消息是重试消息的时候,将msg的topic属性从重试topic还原为真实的topic;
- 如果有消费钩子,那么执行钩子函数的前置方法consumeMessageBefore。我们可以通过DefaultMQPushConsumerImpl#registerConsumeMessageHook方法注册消费钩子ConsumeMessageHook,在消费消息的前后调用。
- 调用listener#consumeMessage方法,进行消息消费,调用实际的业务逻辑,返回执行状态结果如status为null。
1、
正常情况下可返回两种状态:ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消费成功,ConsumeConcurrentlyStatus.RECONSUME_LATER表示消费失败,如果中途抛出异常,则status为null;
5、 对返回的执行状态结果进行判断处理从这里可得知消费超时时间为15min,另外如果返回的status为null,那么status将会被设置为RECONSUME_LATER,即消费失败;
1、 计算消费时间consumeRT如果status为null,如果业务的执行抛出了异常,设置returnType为EXCEPTION,否则设置returnType为RETURNNULL;
2、
如消费时间consumeRT大于等于consumeTimeout,默认15min设置returnType为TIME_OUT消费超时时间可通过DefaultMQPushConsumer.consumeTimeout属性配置,默认15,单位分钟;
3、 如status为RECONSUME_LATER,即消费失败,设置returnType为FAILED;
4、 如status为CONSUME_SUCCESS,即消费成功,设置returnType为SUCCESS;
6、 如果有消费钩子,那么执行钩子函数的后置方法consumeMessageAfter;
- 如果处理队列没有被丢弃,即dropped=false,*
那么调用ConsumeMessageConcurrentlyService#processConsumeResult方法处理消费结果,包含消费重试、提交offset等操作。*
需*
*要注意的是,如果在执行了listener#consumeMessage方法,即执行了业务逻辑之后,处理消费结果之前,该消息队列被丢弃了,例如负载均衡时该队列被分配给了其他新上线的消费者,那么由于dropped=false,导致不会进行最后的消费结果处理,将会导致消息的重复消费,因此必须做好业务层面的幂等性!
**
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
| class ConsumeRequest implements Runnable { private final List<MessageExt> msgs; private final ProcessQueue processQueue; private final MessageQueue messageQueue;
public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) { this.msgs = msgs; this.processQueue = processQueue; this.messageQueue = messageQueue; }
public List<MessageExt> getMsgs() { return msgs; }
public ProcessQueue getProcessQueue() { return processQueue; }
@Override public void run() { if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; }
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap<String, String>()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } }
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue), e); hasException = true; }
long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; }
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
public MessageQueue getMessageQueue() { return messageQueue; }
}
|
3.1 resetRetryAndNamespace重设重试topic
当消息是重试消息的时候,将msg的topic属性从重试topic还原为真实的topic。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) { final String groupTopic = MixAll.getRetryTopic(consumerGroup); for (MessageExt msg : msgs) { String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (retryTopic != null && groupTopic.equals(msg.getTopic())) { msg.setTopic(retryTopic); }
if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) { msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); } } }
|
4 processConsumeResult处理消费结果
对于并发消费的消费结果通过ConsumeMessageConcurrentlyService#processConsumeResult方法处理。
- *需要注意的是,如果在执行了listener#consumeMessage方法,即执行了业务逻辑之后,处理消费结果之前,该消息队列被丢弃了,例如负载均衡时该队列被分配给了其他新上线的消费者,那么由于dropped=false,导致不会进行最后的消费结果处理,将会导致消息的重复消费,因此必须做好业务层面的幂等性!
**
processConsumeResult方法的大概步骤为:
1、 获取ackIndex,默认初始值为Integer.MAX_VALUE,该值表示消费成功的消息在消息集合中的索引,用于辅助进行消息重试;
2、 判断消费状态,设置ackIndex的值:;
1、 CONSUME_SUCCESS消费成功:ackIndex=消息数量–1;
2、 RECONSUME_LATER消费失败:ackIndex=-1;
3、 判断消息模式,处理消费失败的情况:;
1、 BROADCASTING广播模式:对于没有消费成功的消息仅仅打印日志;
2、 CLUSTERING集群模式:;
1 2
| 1. 对于消费失败的消息,调用sendMessageBack方法向broker发送发回当前消息作为延迟消息到重试队列,等待重试消费。对于sendMessageBack发送失败的消息加入msgBackFailed失败集合,设置消息的重试次数属性reconsumeTimes+1 2. 对于sendMessageBack发送失败的消息,调用submitConsumeRequestLater方法,延迟5s将sendMessageBack执行失败的消息再次提交到consumeExecutor进行消费。
|
- 调用ProcessQueue#removeMessage方法从处理队列的msgTreeMap中将消费成功的消息,以及消费失败但是发回broker成功的这批消息移除,然后返回msgTreeMap中的最小的偏移量。
- 如果偏移量大于等于0并且处理队列没有被丢弃,调用OffsetStore#
updateOffset方法,尝试更新内存中的offsetTable中的最新偏移量信息,第三个参数是否仅单调增加offset为true,表示只会尝试更新offset为更大的值。这里仅仅是更新内存中的数据,而offset除了在拉取消息时上报broker进行持久化之外,还会定时每5s调用persistAllConsumerOffset定时持久化。我们在后面Consumer消费进度管理部分会学习相关源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
|
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return;
switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; }
switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } }
if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; }
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
|
4.1 removeMessage移除消息
**从处理队列的msgTreeMap中将消费成功,以及消费失败但是发回broker成功的这批消息移除,然后返回msgTreeMap中的最小的消息偏移量。如果移除后msgTreeMap为空了,那么直接返回该处理队列记录的最大的消息偏移量+1。
**
- *这个返回的offset将会尝试用于更新在内存中的offsetTable中的最新偏移量信息,而offset除了在拉取消息时持久化之外,还会定时每5s调用persistAllConsumerOffset定时持久化。我们在后面Consumer消费进度管理部分会学习源码。
**
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
public long removeMessage(final List<MessageExt> msgs) { long result = -1; final long now = System.currentTimeMillis(); try { this.treeMapLock.writeLock().lockInterruptibly(); this.lastConsumeTimestamp = now; try { if (!msgTreeMap.isEmpty()) { result = this.queueOffsetMax + 1; int removedCnt = 0; for (MessageExt msg : msgs) { MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev != null) { removedCnt--; msgSize.addAndGet(0 - msg.getBody().length); } } msgCount.addAndGet(removedCnt); if (!msgTreeMap.isEmpty()) { result = msgTreeMap.firstKey(); } } } finally { this.treeMapLock.writeLock().unlock(); } } catch (Throwable t) { log.error("removeMessage exception", t); }
return result; }
|
4.2 updateOffset更新offset
尝试更新内存中的offsetTable中的最新偏移量信息,第三个参数是否仅单调增加offset,顺序消费为false,并发消费为true。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
@Override public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { AtomicLong offsetOld = this.offsetTable.get(mq); if (null == offsetOld) { offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); } if (null != offsetOld) { if (increaseOnly) { MixAll.compareAndIncreaseOnly(offsetOld, offset); } else { offsetOld.set(offset); } } } }
|
4.2.1 compareAndIncreaseOnly仅增加offset
如果目标对象目前的值已经大于目标值,则返回false,否则在一个循环中尝试CAS的更新目标对象的值为目标值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) { long prev = target.get(); while (value > prev) { boolean updated = target.compareAndSet(prev, value); if (updated) return true; prev = target.get(); } return false; }
|
__END__