24、RocketMQ源码分析:延时消息实现原理解析
分类:RocketMQ源码分析(1) 2024-03-27 阅读(152)
基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer延迟消息源码。
文章目录
- 1 load加载延迟消息数据
- 1.1 parseDelayLevel解析延迟等级
- 2 start启动调度消息服务
- 3 DeliverDelayedMessageTimerTask投递延迟消息任务
- 3.1 executeOnTimeup执行延迟消息投递
- 3.2 scheduleNextTimerTask下一个调度任务
- 3.3 correctDeliverTimestamp校验投递时间
- 3.4 messageTimeup恢复正常消息
- 3.5 syncDeliver同步投递消息
- 4 延迟消息的总结
- *并发消息消费失败引发消费重试时,默认情况下重试16次,从延迟等级level3(10s)开始,每次延迟时间递增,时间到了又会发送到重试topic去消费,这其中就涉及到RocketMQ的延迟消息,可以说RocketMQ并发消息消费失败引发消费重试就是基于topic替换和延迟消息这两个技术实现的。
**
此前我们学习了RocketMQ的消费重试,我们知道在判断消息为延迟消息的时候,即DelayTimeLevel大于0,那么替换topic为SCHEDULE_TOPIC_XXXX,替换queueId为延迟队列id,
id = level -
1,如果延迟级别大于最大级别,则设置为最大级别18,,默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。最后保存真实topic到消息的REAL_TOPIC属性,保存queueId到消息的REAL_QID属性,方便后面恢复。
**实际上普通的延迟消息也会进行topic替换,那么,发送到SCHEDULE_TOPIC_XXXX对应的消息队列里面的延迟消息,到底是做到能够在给定的延迟时间之后取出来重新投递的呢?下面我们来看看RocketMQ延迟消息的源码。
**
- *实际上RocketMQ通过ScheduleMessageService调度消息服务实现延迟(定时)消息。ScheduleMessageService继承了ConfigManager,在DefaultMessageStore实例化的时候被实例化。
**
1 load加载延迟消息数据
在broker启动执行DefaultMessageStore#load方法加载Commit Log、Consume Queue、index
file等文件,将数据加载到内存中,并完成数据的恢复的时候,同样会执行ScheduleMessageService#load方法,加载延迟消息数据,初始化delayLevelTable和offsetTable。
首先调用父类的ConfigManager#load方法(在broker启动部分就讲过源码了),将延迟消息文件$
{user.home}/store/config/delayOffset.json加载到内存的offsetTable集合中,delayOffset.json中保存着延迟topic每个队列的消费进度(消费偏移量)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
@Override public boolean load() { boolean result = super.load(); result = result && this.parseDelayLevel(); result = result && this.correctDelayOffset(); return result; }
@Override public String configFilePath() { return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig() .getStorePathRootDir()); }
@Override public void decode(String jsonString) { if (jsonString != null) { DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class); if (delayOffsetSerializeWrapper != null) { this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable()); } } }
|
延迟消息文件${user.home}/store/config/delayOffset.json的内容,它保存着延迟队列对应的消费偏移量。
1 2 3 4 5 6 7 8
| { "offsetTable":{ 3:2,4:1 } }
|
1.1 parseDelayLevel解析延迟等级
延迟等级字符串存储在MessageStoreConfig的messageDelayLevel属性中,默认值为”1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m
20m 30m 1h 2h”,即18个等级,因此也是可以配置的,但是单位仅支持s、m、h、d,分别表示秒、分、时、天。
该方法解析延迟等级以及对应的延迟时间到delayLevelTable中,单位统一转换为毫秒,注意延迟等级从1开始。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
|
public boolean parseDelayLevel() { HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(); timeUnitTable.put("s", 1000L); timeUnitTable.put("m", 1000L * 60); timeUnitTable.put("h", 1000L * 60 * 60); timeUnitTable.put("d", 1000L * 60 * 60 * 24); String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); try { String[] levelArray = levelString.split(" "); for (int i = 0; i < levelArray.length; i++) { String value = levelArray[i]; String ch = value.substring(value.length() - 1); Long tu = timeUnitTable.get(ch); int level = i + 1; if (level > this.maxDelayLevel) { this.maxDelayLevel = level; } long num = Long.parseLong(value.substring(0, value.length() - 1)); long delayTimeMillis = tu * num; this.delayLevelTable.put(level, delayTimeMillis); if (this.enableAsyncDeliver) { this.deliverPendingTable.put(level, new LinkedBlockingQueue<>()); } } } catch (Exception e) { log.error("parseDelayLevel exception", e); log.info("levelString String = {}", levelString); return false; }
return true; }
|
2 start启动调度消息服务
ScheduleMessageService依靠内部的定时任务实现延迟消息,ScheduleMessageService通过start方法完成启动。
在broker的启动过程中,会执行DefaultMessageStore的start方法中,该方法内部通过handleScheduleMessageService方法执行ScheduleMessageService的start方法。
该方法的大概逻辑为:
1、 初始化延迟消息投递线程池deliverExecutorService,该线程池是一个调度任务线程池ScheduledThreadPoolExecutor,核心线程数就是最大的延迟等级,默认18;
2、 遍历所有的延迟等级,为每一个延迟等级构建一个对应的DeliverDelayedMessageTimerTask调度任务放到deliverExecutorService中,默认延迟1000ms后执行;
3、 构建一个延迟队列消费偏移量持久化的定时调度任务,首次延迟1000ms之后执行,后续每次执行间隔flushDelayOffsetInterval时间,默认10s;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
public void start() { if (started.compareAndSet(false, true)) { super.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_")); if (this.enableAsyncDeliver) { this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_")); }
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { if (this.enableAsyncDeliver) { this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); } this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); } }
this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { if (started.get()) { ScheduleMessageService.this.persist(); } } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS); } }
|
3 DeliverDelayedMessageTimerTask投递延迟消息任务
**在start方法中,ScheduleMessageService会为每一个延迟等级创建一个DeliverDelayedMessageTimerTask投递延迟消息任务,不同延迟等级的消息放到不同的延迟队列里面,被不同的Task处理。
**
**采用不同的队列处理同一个延迟等级的消息的方式,不再需要进行消息排序,避免了消息排序的复杂逻辑,能比较简单的实现有限等级的延迟消息,RocketMQ的开源版本不支持任意时间的延迟消息,这也是它的一个限制吧!
**
DeliverDelayedMessageTimerTask是一个线程任务,下面来看看它的run方法,主要是调用executeOnTimeup执行消息投递。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Override public void run() { try { if (isStarted()) { this.executeOnTimeup(); } } catch (Exception e) { log.error("ScheduleMessageService, executeOnTimeup exception", e); this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD); } }
|
3.1 executeOnTimeup执行延迟消息投递
延迟消息的核心逻辑实现,执行延迟消息消息投递。
1、
调用findConsumeQueue方法,根据topic和延迟队列id从consumeQueueTable查找需要写入的ConsumeQueue,如果没找到就新建,即ConsumeQueue文件是延迟创建的该方法的源码我们在ReputMessageService异步构建ConsumeQueue和IndexFile部分已经讲过了;
2、
如果没找到对应的消息队列,调用scheduleNextTimerTask方法,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,100ms后执行,本次任务结束;
3、
调用getIndexBuffer方法,根据逻辑offset定位到物理偏移量,然后截取该偏移量之后的一段Buffer,其包含要拉取的消息的索引数据及对应consumeQueue文件之后的全部索引数据,即这里截取的Buffer可能包含多条索引数据该方法的源码我们在broker处理拉取消息请求部分已经讲过了;
4、 遍历缓存buffer中的消息,根据tagsCode投递时间判断消息是否到期,如果到期则回复真实消息并且投递安到真实topic以及对应的queueId中;
1
| 1. 获取该条目对应的消息的tagsCode,对于延迟消息,tagsCode被替换为延迟消息的发送时间(在CommitLog\#checkMessageAndReturnSize方法中,源码此前讲过了)。
|
2、
如果投递时间小于当前时间,那么可以投递该延迟消息如果投递时间大于当前时间,那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束;
3、
根据消息物理偏移量从commitLog中找到该条消息,调用messageTimeup方法构建内部消息对象,设置topic为REAL_TOPIC属性值,即原始topic,设置queueId为REAL_QID属性值,即原始queueId即恢复为正常消息;
4、 最后调用syncDeliver方法投递该消息,消息将会被投递到原始topic和队列中,这样就可以被消费了;
5、 遍历结束,更新下一个offset,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束保证线程任务的活性;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
|
public void executeOnTimeup() {
ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
if (cq == null) { this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE); return; }
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ == null) { long resetOffset; if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) { log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}", this.offset, resetOffset, cq.getQueueId()); } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) { log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}", this.offset, resetOffset, cq.getQueueId()); } else { resetOffset = this.offset; } this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE); return; }
long nextOffset = this.offset; try { int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); if (cq.isExtAddr(tagsCode)) { if (cq.getExt(tagsCode, cqExtUnit)) { tagsCode = cqExtUnit.getTagsCode(); } else { log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", tagsCode, offsetPy, sizePy); long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); } } long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; if (countdown > 0) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt == null) { continue; } MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt); if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", msgInner.getTopic(), msgInner); continue; }
boolean deliverSuc;
if (ScheduleMessageService.this.enableAsyncDeliver) { deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy); } else { deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy); } if (!deliverSuc) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } } nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); } catch (Exception e) { log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e); } finally { bufferCQ.release(); }
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); }
|
3.2 scheduleNextTimerTask下一个调度任务
- *如果没找到对应的消息队列,或者没找到缓存buffer,或者没有过期的消息,获取投递失败等原因,将会新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,100ms后执行,本次任务结束。
**
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
public void scheduleNextTimerTask(long offset, long delay) { ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, offset), delay, TimeUnit.MILLISECONDS); }
|
3.3 correctDeliverTimestamp校验投递时间
校验投递时间,要求投递时间最晚不大于保证投递时间小于等于当前时间 + 延迟时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
private long correctDeliverTimestamp(final long now, final long deliverTimestamp) { long result = deliverTimestamp; long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel); if (deliverTimestamp > maxTimestamp) { result = now; }
return result; }
|
3.4 messageTimeup恢复正常消息
构建一个MessageExtBrokerInner,恢复为正常消息。设置topic的值为REAL_TOPIC属性值,这是原始topic,可能是重试topic或者真实topic。设置queueId的值为REAL_QID属性值,这是原始queueId,可能是重试queueId或者真实queueId。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); MessageAccessor.setProperties(msgInner, msgExt.getProperties());
TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag()); long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); msgInner.setTagsCode(tagsCodeValue); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(msgExt.getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()); msgInner.setWaitStoreMsgOK(false); MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL); msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID); int queueId = Integer.parseInt(queueIdStr); msgInner.setQueueId(queueId);
return msgInner; }
|
3.5 syncDeliver同步投递消息
syncDeliver内部调用asyncPutMessage方法同步投递消息,投递完毕之后,更新offsetTable中的对应延迟队列的消费偏移量。
这里有个bug,第三个参数应该使用当前偏移量,而不是最开始的偏移量,在4.9.4版本已经修复。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy, int sizePy) { PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false); PutMessageResult result = resultProcess.get(); boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK; if (sendStatus) { ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset()); } return sendStatus; }
|
4 延迟消息的总结
投递的消息在broker处理过成功,会判断如果是延迟消息,即DelayTimeLevel大于0,那么替换topic为SCHEDULE_TOPIC_XXXX,替换queueId为延迟队列id,
id = level - 1,如果延迟级别大于最大级别,则设置为最大级别18,默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。
最后保存真实topic到消息的REAL_TOPIC属性,保存queueId到消息的REAL_QID属性,方便后面恢复。
而在RocketMQ端,通过ScheduleMessageService调度消息服务处理投递到SCHEDULE_TOPIC_XXXX的延迟消息。
从源码中可以看到,每个延迟级别都有一个线程专门处理该级别的延迟消息,这样避免了消息的排序,具体的处理逻辑其被封装为一个DeliverDelayedMessageTimerTask线程任务。
不同延迟级别的处理线程将会从各自对应的延迟队列中获取延迟消息,然后和当前时间比较看消息是否过期,如果消息过期,那么构造一个新的消息,设置topic为REAL_TOPIC属性值,即原始topic,设置queueId为REAL_QID属性值,即原始queueId,即恢复为正常消息然后再次进行投递。
另外,延迟消息的consumeQueue条目中的tagsCode并不是tag的hashCOde,而是该条消息的到期时间。
RocketMQ在内部通过内部topic替换实现延迟消息,非常巧妙,而且并发消息重试也是使用了延迟消息。
__END__