22、RocketMQ源码分析:ConsumeMessageOrderlyService顺序消费消息源码
分类:RocketMQ源码分析(1) 2024-03-27 阅读(124)
基于RocketMQ release-4.9.3,深入的介绍了ConsumeMessageOrderlyService顺序消费消息源码。
此前我们学习了consumer消息的拉取流程源码:
**1、
** RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码;
**2、
** RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码【一万字】;
**3、
** RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码;
当前DefaultMQPushConsumer拉取到消息之后,会将消息提交到对应的processQueue处理队列内部的msgTreeMap中。然后通过consumeMessageService#submitConsumeRequest方法将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池消费消息。
consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现,下面我们来看看这两种实现如何消费消息,
此前我们已经学习了并发消费的源码:https://blog.csdn.net/weixin_43767015/article/details/129017181,本次我们先学习顺序消费的源码。
文章目录
- 1 start启动服务定时锁定消息队列
-
- 1.1.1 lockBatchMQ批量锁定消息队列
- 1.1.2 broker处理批量锁定请求lockBatchMQ
-
- 2 submitConsumeRequest提交消费请求
- 3 ConsumeRequest执行消费任务
- 3.1 tryLockLaterAndReconsume尝试延迟加锁并重新消费
- 3.1.1 submitConsumeRequestLater延迟提交消费请求
- 3.2 takeMessages拉取消息
- 4 processConsumeResult处理消费结果
-
- 4.2 checkReconsumeTimes检查重试次数
- 4.3 makeMessageToConsumeAgain标记消息重新消费
- 4.4 updateOffset更新offset
- 4.4.1 compareAndIncreaseOnly仅增加offset
- 5 顺序消费和并发消费的总结
1 start启动服务定时锁定消息队列
consumeMessageService服务在DefaultMQPushConsumerImpl#start方法中被初始化并启动,即调用start方法。*
该方法将会通过scheduledExecutorService定时任务锁定所有分配的mq,保证同时只有一个消费端可以消费。*
这个定时任务在启动1s后开始执行,后每20s执行一次,这里的20s可通过**-D rocketmq.client.rebalance.lockInterval**
属性设置。因此它的频率与负载均衡的默认频率一致,都是20s。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } catch (Throwable e) { log.error("scheduleAtFixedRate lockMQPeriodically exception", e); } } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }
|
内部调用RebalanceImpl#lockAll方法锁定该客户端分配的所有消费队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
public synchronized void lockMQPeriodically() { if (!this.stopped) { this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } }
|
1.2 lockAll锁定所有消息队列
该方法尝试锁定所有消息队列。
1、
根据processQueueTable的数据,构建brokerName到其所有mq的map集合brokerMqs在负载均衡并为当前消费者新分配消息队列的时候,也会对新分配的消息队列申请broker加锁,加锁成功后才会创建对应的processQueue存入processQueueTable也就是说,如果是顺序消息,那么processQueueTable中的数据一定是曾经加锁成功了的;
- 遍历brokerMqs,调用MQClientAPIImpl#lockBatchMQ的方法,向broker发送同步请求,Code为LOCK_BATCH_MQ,请求批量锁定消息队列,返回锁住的mq集合。
3、 遍历锁住的mq集合,获取对应的processQueue,设置processQueue的状态,设置locked为true,重新设置加锁的时间遍历没有锁住的mq,设置locked为false;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
public void lockAll() {
HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator(); while (it.hasNext()) { Entry<String, Set<MessageQueue>> entry = it.next(); final String brokerName = entry.getKey(); final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty()) continue; FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); if (findBrokerResult != null) { LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); requestBody.setClientId(this.mQClientFactory.getClientId()); requestBody.setMqSet(mqs);
try {
Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); for (MessageQueue mq : lockOKMQSet) { ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { if (!processQueue.isLocked()) { log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq); } processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } for (MessageQueue mq : mqs) { if (!lockOKMQSet.contains(mq)) { ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { processQueue.setLocked(false); log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq); } } } } catch (Exception e) { log.error("lockBatchMQ exception, " + mqs, e); } } } }
|
1.1.1 lockBatchMQ批量锁定消息队列
该方法向broker发送同步请求,Code为LOCK_BATCH_MQ,请求批量锁定消息队列,返回锁住的mq集合。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
public Set<MessageQueue> lockBatchMQ( final String addr, final LockBatchRequestBody requestBody, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
request.setBody(requestBody.encode()); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class); Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet(); return messageQueues; } default: break; }
throw new MQBrokerException(response.getCode(), response.getRemark(), addr); }
|
1.1.2 broker处理批量锁定请求lockBatchMQ
broker端通过AdminBrokerProcessor处理LOCK_BATCH_MQ请求。

lockBatchMQ方法就是broker处理批量锁定请求的方法。broker内部使用一个ConcurrentMap<String/* group */, ConcurrentHashMap<
MessageQueue, LockEntry>> 类型的mqLockTable存储不同的consumerGroup的下面的mq到其锁定的clientId的对应关系。大概逻辑为:
1、 首先将已被当前clientId锁定和未锁定的mq分别存储到不同集合然后对于未锁定的mq尝试加锁;
2、 获取本地锁,防止并发,这是一个ReentrantLock,所有的group的分配都获得同一个锁;
3、 获取该消费者组对于消息队列的加锁的情况map,存放着下面的消息队列mq到其获取了锁的消费者客户端id的映射关系*
*一个消费者组下面的一个mq只能被一个clientId锁定**;
4、 遍历未锁定的mq集合;
1、 如果mq未被任务客户端锁定,那么设置新建一个LockEntry,设置为当前clientId,表示已被当前请求的客户端锁定,内部设置锁定时间戳lastUpdateTimestamp为当前毫秒时间戳;
2、
如果已被被当前客户端锁定,并且没有过期那么重新设置锁定时间为当前时间戳,加入到已锁定的mq中,进行下一次循环每次锁定过期时间为REBALANCE_LOCK_MAX_LIVE_TIME,默认60s,可通过-Drocketmq.broker.rebalance.lockMaxLiveTime的broker参数设置;
3、 如果锁已过期,那么设置当前clientId获得了锁,进行下一次循环;
4、 否则,表示所没有过期并且也不是当前clientId获得的锁,仅仅输出日志,进行下一次循环;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) {
Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size()); Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size()); for (MessageQueue mq : mqs) { if (this.isLocked(group, mq, clientId)) { lockedMqs.add(mq); } else { notLockedMqs.add(mq); } } if (!notLockedMqs.isEmpty()) { try { this.lock.lockInterruptibly(); try { ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); if (null == groupValue) { groupValue = new ConcurrentHashMap<>(32); this.mqLockTable.put(group, groupValue); } for (MessageQueue mq : notLockedMqs) { LockEntry lockEntry = groupValue.get(mq); if (null == lockEntry) { lockEntry = new LockEntry(); lockEntry.setClientId(clientId); groupValue.put(mq, lockEntry); log.info( "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", group, clientId, mq); } if (lockEntry.isLocked(clientId)) { lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); lockedMqs.add(mq); continue; } String oldClientId = lockEntry.getClientId(); if (lockEntry.isExpired()) { lockEntry.setClientId(clientId); lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); log.warn( "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", group, oldClientId, clientId, mq); lockedMqs.add(mq); continue; } log.warn( "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", group, oldClientId, clientId, mq); } } finally { this.lock.unlock(); } } catch (InterruptedException e) { log.error("putMessage exception", e); } } return lockedMqs; }
|
1.1.1.1.1. 分布式mq锁小结
*consumeMessageService服务在DefaultMQPushConsumerImpl#start方法中被初始化并启动,即调用start方法。该方法将会通过scheduledExecutorService定时任务锁定所有分配的mq,保证同时只有一个消费端可以消费。
**
*实际上,在之前学习的DefaultMQPushConsumer负载均衡服务的部分就知道,在集群模式加上顺序消费的情况下,一定是要向broker申请messageQueue锁成功之后,才能构建processQueue并且加入到processQueueTable,才能在随后发起拉取消息的请求,所以说,这里的定时任务,仅仅是遍历processQueueTable的所有mq并且申请锁定,起作用更多的是向broker进行分布式mq锁的续期操作。
**
*对于从broker锁定的mq,在客户端的过期时间默认为30s,可以通过客户端启动参数-Drocketmq.client.rebalance.lockMaxLiveTime参数设置。但是在broker端看来,这个锁的过期时间默认60s,可以通过broekr启动参数-Drocketmq.broker.rebalance.lockMaxLiveTime设置。
**
2 submitConsumeRequest提交消费请求
该方法首先会判断是否分发消费,即dispathToConsume是否为true,如果允许,那么将创建一个ConsumeRequest提交到ConsumeMessageOrderlyService内部的consumeExecutor线程池中进行异步消费,否则什么也不做。
注意并没有将消息没有放进ConsumeRequest,因为消费线程会自动拉取treeMap中的消息。
- *什么时候dispathToConsume为true呢?当当前processQueue的内部的msgTreeMap中有消息,并且consuming=false,即还没有开始消费时,将会返回true,即新提交一个消费任务进去激活消费。如果已经在消费了,那么不会提交新的消费任务,老的消费任务会自动去msgTreeMap拉取消息。
**
注意这里的dispathToConsume应该是dispatchToConsume,这是RocketMQ中的语法错误。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
@Override public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } }
|
3 ConsumeRequest执行消费任务
ConsumeRequest作为线程任务被ConsumeMessageOrderlyService内部的consumeExecutor线程池异步的执行。
1 2 3 4 5 6 7 8 9 10 11 12 13
|
this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl(consumeThreadPrefix));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
|
- *首先,我们要记住,在之前学习的DefaultMQPushConsumer负载均衡服务的部分就知道,在集群模式加上顺序消费的情况下,一定是要向broker申请messageQueue锁成功之后,才能构建processQueue并且加入到processQueueTable,才能在随后发起拉取消息的请求,所以说,能够进入到ConsumeRequest执行消息消费情况,一定是此前获得过broker的messageQueue锁。
**
ConsumeRequest的run方法,就是顺序消费的核心方法。下面我们来看看它的大概逻辑:
1、 如果处理队列被丢弃,那么直接返回,不再消费,例如负载均衡时该队列被分配给了其他新上线的消费者,尽量避免重复消费;
2、 消费消息之前先获取当前messageQueue的本地锁,锁对象是一个Object对象,每一个mq对应一个不同的Object,采用原生的synchronized阻塞式的获取同步锁
这将导致ConsumeMessageOrderlyService的线程池中的线程将不会同时并发的消费同一个队列;
3、 如果是广播模式,或者是集群模式,并且锁定了processQueue处理队列,并且processQueue处理队列锁没有过期,那么可以消费消息p
**rocessQueue处理队列锁定实际上就是在负载均衡的时候向broker申请的消息队列分布式锁,申请成功之后将processQueue.locked属性置为true
**内部一个循环中不断的消费,直到消费超时或者条件不满足退出循环;
1、 如果处理队列被丢弃,那么直接返回,不再消费,例如负载均衡时该队列被分配给了其他新上线的消费者,尽量避免重复消费;
2、
如果是集群模式,并且没有锁定了processQueue处理队列,或者processQueue处理队列锁已经过期,那么调用tryLockLaterAndReconsume尝试延迟10ms请求broekr加锁并重新延迟提交新的消费请求;
3、
计算消费时间如果单次消费任务的消费时间大于默认60s,那么延迟10ms提交新的消费请求,并且结束循环,本次消费任务结束单次最大消费时间可以通过-Drocketmq.client.maxTimeConsumeContinuously配置启动参数来设置时间;
4、
调用getConsumeMessageBatchMaxSize方法,获取单次批量消费的数量consumeBatchSize,默认1,可以通过DefaultMQPushConsumer.consumeMessageBatchMaxSize的属性配置;
5、 调用takeMessages方法,从processQueue内部的msgTreeMap有序map集合中获取offset最小的consumeBatchSize条消息,按顺序从最小的offset返回,保证有序性;
6、 调用resetRetryAndNamespace方法,重置重试topic,当消息是重试消息的时候,将msg的topic属性从重试topic还原为真实的topic;
7、 如果takeMessages方法拉取到了消息,那么进行消费;
1 2 3 4 5 6 7 8 9 10 11 12
| 1. 如果有钩子,那么执行consumeMessageBefore前置方法。我们可以通过DefaultMQPushConsumerImpl\#registerConsumeMessageHook方法注册消费钩子ConsumeMessageHook,在消费消息的前后调用。 2. **真正消费消息之前再获取processQueue的本地消费锁,保证消息消费时,一个处理队列不会被并发消费。从这里可知,顺序消费需要获取三把锁,broker的messageQueue锁,本地的messageQueue锁,本地的processQueue锁。** 3. 调用listener\#consumeMessage方法,进行消息消费,调用实际的业务逻辑,返回执行状态结果,有四种状态,ConsumeOrderlyStatus.SUCCESS 和 ConsumeOrderlyStatus.SUSPEND\_CURRENT\_QUEUE\_A\_MOMENT推荐使用,ConsumeOrderlyStatus.ROLLBACK和ConsumeOrderlyStatus.COMMIT已被废弃。 4. 解锁,然后对返回的执行状态结果进行判断处理。 1. 如status为null,或返回了ROLLBACK或者SUSPEND\_CURRENT\_QUEUE\_A\_MOMENT状态,那么输出日志。 2. 计算消费时间consumeRT。如果status为null,如果业务的执行抛出了异常,设置returnType为EXCEPTION,否则设置returnType为RETURNNULL。 3. 如消费时间consumeRT大于等于consumeTimeout,**默认15min**。设置returnType为TIME\_OUT。消费超时时间可通过DefaultMQPushConsumer. consumeTimeout属性配置,默认15,单位分钟。 4. 如status为SUSPEND\_CURRENT\_QUEUE\_A\_MOMENT,即消费失败,设置returnType为FAILED。 5. 如status为SUCCESS,即消费成功,设置returnType为SUCCESS。 5. 如果有消费钩子,那么执行钩子函数的后置方法consumeMessageAfter。 6. 调用**ConsumeMessageOrderlyService\#processConsumeResult**方法处理消费结果,包含重试等逻辑。
|
8、 如果没有拉取到消息,那么设置continueConsume为false,将会跳出循环;
4、
如果集群模式,但是没有锁定了processQueue处理队列,或者processQueue处理队列锁已经过期,判断如果processQueue被丢弃,则直接结束本次消费请求,否则调用tryLockLaterAndReconsume尝试延迟100ms请求borker加锁并重新延迟提交新的消费请求;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
| class ConsumeRequest implements Runnable { private final ProcessQueue processQueue; private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) { this.processQueue = processQueue; this.messageQueue = messageQueue; }
public ProcessQueue getProcessQueue() { return processQueue; }
public MessageQueue getMessageQueue() { return messageQueue; }
@Override public void run() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; }
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && this.processQueue.isLockExpired()) { log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; } final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize); defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); consumeMessageContext.setProps(new HashMap<String, String>()); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try {
this.processQueue.getConsumeLock().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; }
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue), e); hasException = true; } finally { this.processQueue.getConsumeLock().unlock(); }
if (null == status || ConsumeOrderlyStatus.ROLLBACK == status || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); } long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeOrderlyStatus.SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; }
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageOrderlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }
}
|
3.1 tryLockLaterAndReconsume尝试延迟加锁并重新消费
集群模式下,尝试延迟加锁并重新消费。大概的逻辑为:
1、 构建一个延迟线程任务,通过延迟线程池服务在给定的延迟时间之后执行延迟时间,以及该方法的触发有两种情况:;
1、 如果已经在循环中,处理队列没有被丢弃,但是发现没有锁定或者锁过期,那么延迟10ms;
2、 如果在最开始判断的时候,处理队列没有被丢弃,但是发现没有锁定或者锁过期,那么延迟100ms;
2、 内部的延迟任务如下:;
1、 首先尝试请求broker锁定该mq;
2、 如果锁定成功,那么调用submitConsumeRequestLater方法延迟提交消费请求,延迟10ms;
3、 如果锁定失败,那么同样调用submitConsumeRequestLater方法延迟提交消费请求,但是延迟3000ms,即3s;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue, final long delayMills) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { boolean lockOK = ConsumeMessageOrderlyService.this.lockOneMQ(mq); if (lockOK) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 10); } else { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 3000); } } }, delayMills, TimeUnit.MILLISECONDS);
|
3.1.1 submitConsumeRequestLater延迟提交消费请求
- *在一次消费任务过程中,如果发现除了处理队列被丢弃之外的不满足继续消费的条件时,例如:如果是集群模式但没有锁定了processQueue处理队列或者processQueue处理队列锁已经过期,或者本次任务消费时间超过了默认的最大时间60s,都将会调用submitConsumeRequestLater延迟一定时间提交新的消费请求,并且本次消费请求结束。
**
**另外,如果消费失败,返回status =
null或者SUSPEND_CURRENT_QUEUE_A_MOMENT,那么当校验没有达到最大重试次数时,也会调用submitConsumeRequestLater延迟1s提交新的消费请求,并且本次消费请求结束。
**
*submitConsumeRequestLater方法内部构建一个延迟线程任务,通过延迟线程池服务在给定的延迟时间之后执行submitConsumeRequest方法,其中dispathToConsume参数为true,也就是说一定会构建一个新的ConsumeRequest并且将请求提交到consumeExecutor线程池中进行消费。
**
*此时,执行该新提交的线程任务的线程将和执行此前的任务的线程可能不是同一个线程,但是,却一定能保证同时只有一个线程能对同一个消息队列执行消费。所以说,顺序消费同样是通过线程池消费的,他不能保证每次都是同一个线程去消费同一个消息队列,但是他能保证同一时刻同一个队列只有一个线程去消费。
**
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
|
private void submitConsumeRequestLater( final ProcessQueue processQueue, final MessageQueue messageQueue, final long suspendTimeMillis ) { long timeMillis = suspendTimeMillis; if (timeMillis == -1) { timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis(); } if (timeMillis < 10) { timeMillis = 10; } else if (timeMillis > 30000) { timeMillis = 30000; } this.scheduledExecutorService.schedule(new Runnable() {
@Override public void run() { ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true); } }, timeMillis, TimeUnit.MILLISECONDS); }
|
3.2 takeMessages拉取消息
顺序消息使用的方法。从processQueue内部的msgTreeMap有序map集合中获取offset最小的consumeBatchSize条消息,按顺序从最小的offset返回,保证有序性。
拉取操作需要获取到当前treeMapLock锁,每次都拉取移除msgTreeMap中的第一条消息,也就是offset最小的一条消息。然后将拉取到的消息存入consumingMsgOrderlyTreeMap中,表示正在消费的消息。
最后会判断如果没有拉取到任何一条消息,那么设置consuming = false,表示没有消息了,处于非消费状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
public List<MessageExt> takeMessages(final int batchSize) { List<MessageExt> result = new ArrayList<MessageExt>(batchSize); final long now = System.currentTimeMillis(); try { this.treeMapLock.writeLock().lockInterruptibly(); this.lastConsumeTimestamp = now; try { if (!this.msgTreeMap.isEmpty()) { for (int i = 0; i < batchSize; i++) { Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry(); if (entry != null) { result.add(entry.getValue()); consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue()); } else { break; } } } if (result.isEmpty()) { consuming = false; } } finally { this.treeMapLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("take Messages exception", e); }
return result; }
|
4 processConsumeResult处理消费结果
该方法处理顺序消费的消费结果,包含提交以及重试的逻辑。
1、 首先判断context.autoCommit属性是否为true,即是否设置为自动提交,这个context是在每次消费时都会创建的一个对象,并且默认都是true,除非在业务中手动改为false,所以一般都是自动提交;
2、 然后对于各种返回的状态进行判断和处理:;
1、 如果返回COMMIT和ROLLBACK这两种废弃的状态,那么仅仅打印日志,并且默认算作SUCCESS状态;
2、 如果返回SUCCESS,表示消费成功那么调用commit方法通过处理队列提交offset,这里仅仅是更新本地内存的消息缓存信息,返回待更新的offset然后增加成功的统计信息;
3、 如果返回SUSPEND_CURRENT_QUEUE_A_MOMENT,表示返回失败那么增加失败的统计信息;
1 2 3
| 1. 调用checkReconsumeTimes方法,校验是否达到最大重试次数,可以通过DefaultMQPushConsumer\#maxReconsumeTimes属性配置,默认无上限,即Integer.MAX\_VALUE。 2. 如果没有到达最大重试次数,那么调用makeMessageToConsumeAgain方法标记消息等待再次消费,随后调用延迟提交新的消费请求,默认suspendTimeMillis为-1,即延迟1s后重新消费。设置continueConsume = false,本消费请求消费结束不会继续消费。 3. 达到了最大重试次数,那么调用commit提交消息,返回待更新的offset,算作成功。
|
- 如果待更新的偏移量大于等于0并且处理队列没有被丢弃,调用OffsetStore#
updateOffset方法,尝试更新内存中的offsetTable中的最新偏移量信息,第三个参数是否仅单调增加offset为false,表示可能会将offset更新为较小的值。这里仅仅是更新内存中的数据,而offset除了在拉取消息时上报broker进行持久化之外,还会定时每5s调用persistAllConsumerOffset定时持久化。我们在后面Consumer消费进度管理部分会学习相关源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
public boolean processConsumeResult( final List<MessageExt> msgs, final ConsumeOrderlyStatus status, final ConsumeOrderlyContext context, final ConsumeRequest consumeRequest ) { boolean continueConsume = true; long commitOffset = -1L; if (context.isAutoCommit()) { switch (status) { case COMMIT: case ROLLBACK: log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue()); case SUCCESS:
commitOffset = consumeRequest.getProcessQueue().commit(); this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } else { commitOffset = consumeRequest.getProcessQueue().commit(); } break; default: break; } } else { switch (status) { case SUCCESS: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; case COMMIT: commitOffset = consumeRequest.getProcessQueue().commit(); break; case ROLLBACK: consumeRequest.getProcessQueue().rollback(); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } break; default: break; } }
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); }
return continueConsume; }
|
4.1 commit提交消息
- *一次消费成功后,或者消费失败但是重试次数已经达到最大重试次数(可以通过DefaultMQPushConsumer#maxReconsumeTimes属性配置,默认无上限,即Integer.MAX_VALUE)。那么会执行ProcessQueue#commit方法。
**
ProcessQueue#commit方法将会删除processQueue中消费完毕的消息,返回待更新的offset,或者说待提交的offset。大概步骤为:
1、 此前我们知道takeMessages方法会将拉取到的消息放入consumingMsgOrderlyTreeMap中,那么这里会先从该map中获取最大消息offset;
2、 然后msgCount减去已经消费完成的消息数量,msgSize减去已经消费完成的消息大小,清空正在消费的消息consumingMsgOrderlyTreeMap;
3、 最后返回下一个offset,也就是提交的待提交的offset,为已消费完毕的最大offset+1;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
public long commit() { try { this.treeMapLock.writeLock().lockInterruptibly(); try { Long offset = this.consumingMsgOrderlyTreeMap.lastKey(); msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size()); for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { msgSize.addAndGet(0 - msg.getBody().length); } this.consumingMsgOrderlyTreeMap.clear(); if (offset != null) { return offset + 1; } } finally { this.treeMapLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("commit exception", e); }
return -1; }
|
4.2 checkReconsumeTimes检查重试次数
检查重试次数判断,是否挂起消费。大概逻辑为:
- 遍历所有的消息,校验是否达到最大重试次数,可以通过DefaultMQPushConsumer#maxReconsumeTimes属性配置,默认无上限,即Integer.MAX_VALUE。
2、
如果达到最大重试次数,设置RECONSUME_TIME属性,通过sendMessageBack发回broker延迟topic如果sendMessageBack发送失败,则suspend=true,表示挂起消费,然后设置消息的重试次数属性reconsumeTimes+1;
3、 如果没有达到最大重试次数则suspend=true,表示挂起消费,然后设置消息的重试次数属性reconsumeTimes+1;
可以看到,达到了最大重试次数但是sendMessageBack失败,或者没有达到最大重试次数,那么都会在随后暂时挂起消费,随后重试消费,否则的话,算作消费成功,随后将会commit。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
private boolean checkReconsumeTimes(List<MessageExt> msgs) { boolean suspend = false; if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) { MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes())); if (!sendMessageBack(msg)) { suspend = true; msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); } } else { suspend = true; msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); } } } return suspend; }
|
4.3 makeMessageToConsumeAgain标记消息重新消费
对于checkReconsumeTimes返回false的情况,即达到了最大重试次数但是sendMessageBack失败,或者没有达到最大重试次数,将会调用该方法,标记消息重新消费。
所谓标记,实际上很简单,就是将需要重复消费消息从正在消费的consumingMsgOrderlyTreeMap中移除,然后重新存入待消费的msgTreeMap中,那么将会在随后的消费中被拉取,进而实现重复消费。
- *所以说,并发消费的重复消费,需要将消息发往broker的重试topic中,等待再次拉取并重新消费,而顺序消费的重复消费就更加简单了,通过consumingMsgOrderlyTreeMap和msgTreeMap这两个map,实现直接在本地重试,不需要经过broker,直到达到了最大重试次数,才会通过sendMessageBack方法将消息发往broker,但是不会再被消费到了。
**
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
public void makeMessageToConsumeAgain(List<MessageExt> msgs) { try { this.treeMapLock.writeLock().lockInterruptibly(); try { for (MessageExt msg : msgs) { this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset()); this.msgTreeMap.put(msg.getQueueOffset(), msg); } } finally { this.treeMapLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("makeMessageToCosumeAgain exception", e); } }
|
4.4 updateOffset更新offset
尝试更新内存中的offsetTable中的最新偏移量信息,第三个参数是否仅单调增加offset,顺序消费为false,并发消费为true。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
@Override public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { AtomicLong offsetOld = this.offsetTable.get(mq); if (null == offsetOld) { offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); } if (null != offsetOld) { if (increaseOnly) { MixAll.compareAndIncreaseOnly(offsetOld, offset); } else { offsetOld.set(offset); } } } }
|
4.4.1 compareAndIncreaseOnly仅增加offset
如果目标对象目前的值已经大于目标值,则返回false,否则在一个循环中尝试CAS的更新目标对象的值为目标值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) { long prev = target.get(); while (value > prev) { boolean updated = target.compareAndSet(prev, value); if (updated) return true; prev = target.get(); } return false; }
|
5 顺序消费和并发消费的总结
我们学习了并发消费和顺序消费的源码,现在我们来总结一下他们之间的关键知识点!
多线程和顺序性:顺序消费和并发消费实际上都是使用线程池消费,但是不同的是,对于同一个消息队列的消息,并发消费可能有多条线程并发的消费消息,提升了消息速度,但是没有顺序性。
而顺序消费则通过一系列锁,保证同一时刻对于同一个队列只有一个线程去消费它,注意是“只有一个线程”而不是“同一个线程”,因此有可能你先发送的消息被ThreadA消费了,但是你发送的的第二个消息被ThreadB消费了,这是完全正确的,因为他们并不是同时消费的。但是对于不同的队列,顺序消费则不能保证消费有序性,因为不同的队列有不同的锁。
消费重试
:并发消费和顺序消费对于消费失败的消息均会有消息重试机制。RocketMQ的消息重试(消息重投)。
顺序消费的保证:顺序消费模式使用3把锁来保证消费的顺序性:
1、 broker端的分布式锁:;
1、
在负载均衡的处理新分配队列的updateProcessQueueTableInRebalance方法,以及ConsumeMessageOrderlyService服务启动时的start方法中,都会尝试向broker申请当前消费者客户端分配到的messageQueue的分布式锁;
2、 broker端的分布式锁存储结构为ConcurrentMap<String/*group*/,ConcurrentHashMap<MessageQueue,LockEntry>>
,该分布式锁保证同一个consumerGroup下同一个messageQueue只会被分配给一个consumerClient;
- 获取到的broker端的分布式锁,在client端的表现形式为processQueue.
locked属性为true,且该分布式锁在broker端默认60s过期,而在client端默认30s过期,因此ConsumeMessageOrderlyService#start会启动一个定时任务,每过20s向broker申请分布式锁,刷新过期时间。而负载均衡服务也是每20s进行一次负载均衡。
4、 broker端的分布式锁最先被获取到,如果没有获取到,那么在负载均衡的时候就不会创建processQueue了也不会提交对应的消费请求了;
2、 messageQueue的本地synchronized锁:;
1、 在执行消费任务的开头,便会获取该messageQueue的本地锁对象objLock,它是一个Object对象,然后通过synchronized实现锁定;
2、 这个锁的锁对象存储在MessageQueueLock.mqLockTable属性中,结构为ConcurrentMap<MessageQueue,Object>
,所以说,一个MessageQueue对应一个锁,不同的MessageQueue有不同的锁;
3、 因为顺序消费也是通过线程池消费的,所以这个synchronized锁用来保证同一时刻对于同一个队列只有一个线程去消费它;
3、 ProcessQueue的本地consumeLock;
1
| 1. 在获取到broker端的分布式锁以及messageQueue的本地synchronized锁的之后,在执行真正的消息消费的逻辑messageListener\#consumeMessage之前,会获取ProcessQueue的consumeLock,这个本地锁是一个ReentrantLock。
|
2、 那么这把锁有什么作用呢?;
1 2 3 4 5
| 1. 在负载均衡时,如果某个队列C被分配给了新的消费者,那么当前客户端消费者需要对该队列进行释放,它会调用removeUnnecessaryMessageQueue方法对该队列C请求broker端分布式锁的解锁。 2. 而在请求broker分布式锁解锁的时候,一个重要的操作就是首先尝试获取这个messageQueue对应的ProcessQueue的本地consumeLock。只有获取了这个锁,才能尝试请求broker端对该messageQueue的分布式锁解锁。 3. 如果consumeLock加锁失败,表示当前消息队列正在消息,不能解锁。那么本次就放弃解锁了,移除消息队列失败,只有等待下次重新分配消费队列时,再进行移除。 4. 如果没有这把锁,假设该消息队列因为负载均衡而被分配给其他客户端B,但是由于客户端A正在对于拉取的一批消费消息进行消费,还没有提交消费点位,如果此时客户端A能够直接请求broker对该messageQueue解锁,这将导致客户端B获取该messageQueue的分布式锁,进而消费消息,而这些没有commit的消息将会发送重复消费。 5. 所以说这把锁的作用,就是防止在消费消息的过程中,该消息队列因为发生负载均衡而被分配给其他客户端,进而导致的两个客户端重复消费消息的行为。
|
客户端对于不属于自己的messageQueue进行解锁的方法,可以看到解锁前需要先获取这个messageQueue对应的ProcessQueue的本地consumeLock。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
|
@Override public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
if (this.defaultMQPushConsumerImpl.isConsumeOrderly() && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { try { if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) { try {
return this.unlockDelay(mq, pq); } finally { pq.getConsumeLock().unlock(); } } else { log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", mq, pq.getTryUnlockTimes()); pq.incTryUnlockTimes(); } } catch (Exception e) { log.error("removeUnnecessaryMessageQueue Exception", e); }
return false; } return true; }
|
__END__