13、RocketMQ源码分析:Broker消息重放服务ReputMessageService源码解析
此前我们学习了RocketMQ源码(12)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】
,这篇文将讲的是如何构建消息文件ConsumeQueue和IndexFile。
CommitLog文件顺序存储着所有的消息,理论上来说RocketMQ只要有CommitLog文件就可以正常运行了,但是我们再此前的文章(RocketMQ的底层消息存储架构以及优化措施
)中介绍过,RocketMQ还存在另外两个重要的文件服务:
1、
ConsumeQueue文件:ConsumeQueue文件可以看作是CommitLog的消息偏移量索引文件,其存储了它所属Topic的消息在CommitLog中的偏移量消费者拉取消息的时候,可以从ConsumeQueue中快速的根据偏移量定位消息在CommitLog中的位置;
2、 IndexFile索引文件:IndexFile文件可以看作是CommitLog的消息时间范围索引文件IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法;
- *虽然CommitLog文件顺序存储着所有的消息,但是其并没有区分任何的topic、tag等信息,我们只能顺序遍历CommitLog文件去查找消费数据,性能非常底下,因此才有了ConsumeQueue和IndexFile这两种索引文件系统,这两个文件系统的主要母的也是用于加快客户端的消费速度或者是查询效率。
**
**此前我们学习了broker的消息刷盘的源码,我们仅仅是了解了CommitLog文件刷盘的流程,现在我们来学习ConsumeQueue文件和IndexFile文件的构建流程
**
文章目录
- 1 ReputMessageService消息重放服务
- 2 doReput执行重放
- 2.1 isCommitLogAvailable是否需要重放
- 2.2 getData获取重放数据
- 2.2.1 selectMappedBuffer截取一段内存
- 2.3 checkMessageAndReturnSize检查消息并构建请求
- 2.4 doDispatch分发请求
- 3 总结
1 ReputMessageService消息重放服务
ReputMessageService服务将会在循环中异步的每隔1ms对于写入CommitLog的消息进行重放,即将消息构建成为DispatchRequest对象,然后将DispatchRequest对象分发给各个CommitLogDispatcher处理,这些CommitLogDispatcher通常会尝试构建ConsumeQueue索引、IndexFile索引以及SQL92布隆过滤器。
ReputMessageService和此前介绍的刷盘服务一样,属于异步线程服务。其随着消息存储对象DefaultMessageStore的创建而创建,并且在DefaultMessageStore:start方法中被启动。我们直接看它的run方法。
1 | /** |
可以看到,该服务将会在一个循环中,每隔1ms执行一次duReput方法,doReput方法也就是重放的方法。
2 doReput执行重放
该方法对于写入CommitLog的消息进行重放,所谓的重放就是完成诸如ConsumeQueue索引、IndexFile索引、布隆过滤器、唤醒长轮询线程和被hold住的请求等操作。
该方法的大概逻辑为:
1、 如果重放偏移量reputFromOffset小于commitlog的最小物理偏移量,那么设置为commitlog的最小物理偏移量,如果重放偏移量小于commitlog的最大物理偏移量,那么循环重放;
2、
调用getData方法根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile,然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer,这段内存存储着将要重放的消息;
3、 开始循环读取这段ByteBuffer中的消息,依次进行重放;
1、 如果存在消息,调用checkMessageAndReturnSize方法检查当前消息的属性并且构建一个DispatchRequest对象返回;
- 调用doDispatch方法分发重放请求,将会调用所有CommitLogDispatcher#dispatch方法。
1 | 1. CommitLogDispatcherBuildConsumeQueue:根据DispatchRequest写ConsumeQueue文件,构建ConsumeQueue索引。 |
3、
如果broker角色不是SLAVE,并且支持长轮询,并且消息送达的监听器不为null,那么通过该监听器的arriving方法触发调用pullRequestHoldService的pullRequestHoldService方法,即唤醒挂起的拉取消息请求,表示有新的消息落盘,可以进行拉取了;
4、 如果读取到MappedFile文件尾,那么获取下一个文件的起始索引继续重放;
1 | /** |
2.1 isCommitLogAvailable是否需要重放
该方法用于判断CommitLog是否需要执行重放,如果重放偏移量小于commitlog的最大物理偏移量,那么就需要执行重放。
1 | /** |
2.2 getData获取重放数据
根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile,然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer,这段内存存储着将要重放的消息。
1 | /** |
2.2.1 selectMappedBuffer截取一段内存
从指定相对偏移量开始从指定MappedFile中的mappedByteBuffer中截取一段ByteBuffer,这段内存存储着将要重放的消息。这段ByteBuffer和原mappedByteBuffer共享同一块内存,但是拥有自己的指针。
然后根据起始物理索引、截取的ByteBuffer、截取的ByteBuffer大小以及当前CommitLog对象构建一个SelectMappedBufferResult对象返回。
1 | /** |
2.3 checkMessageAndReturnSize检查消息并构建请求
该方法将会检查这段内存中的下一条消息,这里我们仅仅需要读取消息的各种属性即可,不需要读取具体的消息内容body。最后并且根据这些属性构建一个DispatchRequest对象返回。
**需要注意这里有个对于延迟消息的特殊处理,即tagCode属性,对于普通消息就是tags的hashCode值,对于延迟消息则是消息将来投递的时间戳,用于用于后续判断消息是否到期。
**
1 | /** |
2.4 doDispatch分发请求
该方法将构建的DispatchRequest分发出去,即循环调用DefaultMessageStore内部的dispatcherList中的CommitLogDispatcher的dispatch方法,取处理这个请求。
这个方法可以说是ReputMessageService服务的核心代码了,表面面上看仅仅是分发请求。实际上,ConsumeQueue索引、IndexFile索引等操作都是由对应的CommitLogDispatcher来负责实现的。
DefaultMessageStore内部的dispatcherList默认有三个CommitLogDispatcher:
1、 CommitLogDispatcherBuildConsumeQueue:根据DispatchRequest写ConsumeQueue文件,构建ConsumeQueue索引;
2、 CommitLogDispatcherBuildIndex:根据DispatchRequest写IndexFile文件,构建IndexFile索引;
3、 CommitLogDispatcherCalcBitMap:根据DispatchRequest构建布隆过滤器,加速SQL92过滤效率,避免每次都解析sql;
1 | /** |
3 总结
- *本次我们学习了ReputMessageService消息重放服务的总体流程,下一篇文章我们将深入学习CommitLogDispatcherBuildConsumeQueue、CommitLogDispatcherBuildIndex到底是如何构建异步构建ConsumeQueue和IndexFile索引文件的。
**
__END__