12、RocketMQ源码分析:Broker消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】 习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者异步:
1、 同步刷盘:如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多;2、 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量;
此前我们学习了brokeer的处理消息以及追加消息的源码流程:RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计【一万字】 ,在CommitLog#asyncPutMessage 方法中会进行消息的存储,我们讲解了获取MappedFile 以及appendMessage 方法的源码,* *appendMessage**仅仅是将消息追加到内存中,并没有真正的落到磁盘上。
在CommitLog#asyncPutMessage 方法的最后才会调用submitFlushRequest 方法提交刷盘请求,broker将会根据刷盘策略进行刷盘。该方法就是RocketMQ的broker刷盘的入口方法,我们现在来学习RocketMQ 是如何实现同步和异步刷盘 的。
文章目录
1 初始化存储服务
2 submitFlushRequest提交刷盘请求
3 GroupCommitService同步刷盘
3.1.1 waitForRunning等待运行
3.1.1.1 onWaitEnd等待结束交换请求
3.1.2. doCommit执行刷盘
3.2 putRequest存入请求
3.2.1 Wakeup唤醒刷盘线程
3.3 双队列读写分离设计
4 FlushRealTimeService异步刷盘
5 CommitRealTimeService异步堆外缓存刷盘
6 MappedFile的刷盘
6.1 MappedFileQueue#flush刷盘
6.1.1 findMappedFileByOffset根据偏移量获取MappedFile
6.1.2 MappedFile#flush执行刷盘
6.1.2.1 isAbleToFlush是否可刷盘
6.2 MappedFileQueue#commit提交
6.2.1 mappedFile#commit提交
7 总结
1 初始化存储服务 在CommitLog初始化的时候,在其构造器中会初始化该CommitLog对应的存储服务。
1、 GroupCommitService:同步刷盘服务;2、 FlushRealTimeService:异步刷盘服务;3、 CommitRealTimeService:异步转存服务;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { this .flushCommitLogService = new GroupCommitService (); } else { this .flushCommitLogService = new FlushRealTimeService (); } this .commitLogService = new CommitRealTimeService ();
这些服务本身就是一个个的线程任务,在创建了这些服务之后,在**CommitLog#start()**方法中将会对这些服务进行启动。
2 submitFlushRequest提交刷盘请求 该方法中将会根据broker的配置选择不同的刷盘策略:
1、 如果是同步刷盘 ,那么获取同步刷盘服务GroupCommitService:;
1、 同步等待 :如果消息的配置需要等待存储完成后才返回,那么构建同步刷盘请求,并且将请求存入内部的requestsWrite,并且唤醒同步刷盘线程,然后仅仅返回future,没有填充刷盘结果,将会在外部thenCombine方法处阻塞等待这是同步刷盘的默认配置 ;2、 同步不等待 :如果消息的配置不需要等待存储完成后才返回,即不需要等待刷盘结果,那么唤醒同步刷盘线程就可以了,随后直接返回PUT_OK;2、 如果是异步刷盘 :;
1、 如果启动了堆外缓存读写分离 ,即transientStorePoolEnable 为true 并且不是SLAVE ,那么唤醒异步转存服务* *CommitRealTimeService;2、 如果 没有启动堆外缓存,那么唤醒异步刷盘服务 FlushRealTimeService这是 异步刷盘的默认配置**;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public CompletableFuture<PutMessageStatus> submitFlushRequest (AppendMessageResult result, MessageExt messageExt) { if (FlushDiskType.SYNC_FLUSH == this .defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this .flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest (result.getWroteOffset() + result.getWroteBytes(), this .defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); flushDiskWatcher.add(request); service.putRequest(request); return request.future(); } else { service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } else { if (!this .defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }
3 GroupCommitService同步刷盘 同步刷盘服务为GroupCommitService 。创建GroupCommitService对象时,将会初始化两个内部集合,分别是requestsWrite 和* *requestsRead, requestsWrite用于存放putRequest方法写入的刷盘请求, requestsRead用于存放 doCommit**方法读取的刷盘请求。使用两个队列实现读写分离,可以避免putRequest提交刷盘请求与doCommit消费刷盘请求之间的锁竞争。
另外,还会初始化一个独占锁 ,用于保证存入请求和交换请求操作的线程安全。
1 2 3 4 5 6 private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList <GroupCommitRequest>();private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList <GroupCommitRequest>();private final PutMessageSpinLock lock = new PutMessageSpinLock ();
3.1 run同步刷盘 **GroupCommitService本身是一个线程任务,其内部还保存着一个线程,线程启动之后将会执行run方法,该方法就是同步刷盘的核心方法。 **
该方法中,将会在死循环中不断的执行刷盘的操作,主要是循环执行两个方法:
1、 waitForRunning :等待执行刷盘操作并且交换请求,同步刷盘服务最多等待10ms;2、 doCommit :尝试执行批量刷盘;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public void run () { CommitLog.log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { this .waitForRunning(10 ); this .doCommit(); } catch (Exception e) { CommitLog.log.warn(this .getServiceName() + " service has exception. " , e); } } try { Thread.sleep(10 ); } catch (InterruptedException e) { CommitLog.log.warn(this .getServiceName() + " Exception, " , e); } synchronized (this ) { this .swapRequests(); } this .doCommit(); CommitLog.log.info(this .getServiceName() + " service end" ); }
3.1.1 waitForRunning等待运行 **用于刷盘线程等待执行刷盘操作并且交换请求,该方法实际上是父类ServiceThread的方法,同步和异步刷盘服务都会调用该方法,同步刷盘服务最多等待10ms。 **
1、 首先尝试尝试CAS的将已通知标志位从true改为false,表示正在或已执行刷盘操作如果成功则表示服务线程曾被尝试唤醒过,或者说wakeup() 方法曾被调用过,即此前曾有过消息存储的请求,那么此时直接调用onWaitEnd 方法交换读写队列,为后续消息持久化做准备;2、 如果CAS失败,即已通知标志位已经是false了,表示服务线程曾没有被尝试唤醒过,或者说wakeup () 方法曾没有被调用过,即此前这段时间没有提交过消息存储的请求;3、 由于此前没有刷盘请求被提交过,那么刷盘服务线程等待一定的时间,减少资源消耗,等待的时间有参数传递,同步刷盘服务最多等待10ms;4、 等待时间到了或者因为刷盘请求而被唤醒,此时将已通知标志位直接改为false,表示正在或已执行刷盘操作调用onWaitEnd 方法交换读写队列,为后续消息持久化做准备,一定会尝试执行一次刷盘操作;
**可以看到,该方法首先会尝试一次CAS,如果成功则表示此前有过提交请求,则交换读写队列并结束,否则会进行等待,直到超时或者被提交请求唤醒。 **
还可以得知,同步刷盘服务在没有提交请求的时候同样会等待,只不过最多等待10ms。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 protected void waitForRunning (long interval) { if (hasNotified.compareAndSet(true , false )) { this .onWaitEnd(); return ; } waitPoint.reset(); try { waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted" , e); } finally { hasNotified.set(false ); this .onWaitEnd(); } }
3.1.1.1 onWaitEnd等待结束交换请求 该方法被GroupCommitService重写,用于交换读写队列。
1 2 3 4 5 6 7 8 9 10 @Override protected void onWaitEnd () { this .swapRequests(); }
swapRequests方法用于交换请求,说白了就是交换读写队列引用,在交换的时候需要加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void swapRequests () { lock.lock(); try { LinkedList<GroupCommitRequest> tmp = this .requestsWrite; this .requestsWrite = this .requestsRead; this .requestsRead = tmp; } finally { lock.unlock(); } }
3.1.2. doCommit执行刷盘 在交换了读写队列之后,requestsRead实际上引用到了requestsWrite队列,doCommit方法将会执行刷盘操作,该方法的大概步骤为:
1、 判断requestsRead队列是否存在元素,如果不存在,也需要进行刷盘,因为某些消息的设置是同步刷盘但是不等待,因此这里直接调用mappedFileQueue.flush(0)方法 进行一次同步刷盘即可,无需唤醒线程等操作;2、 如果队列存在元素,表示有提交同步等待刷盘请求,那么遍历队列依次刷盘;
1、 每个刷盘请求最多刷盘两次;
1 2 3 1. 首先判断如果**flushedWhere**(CommitLog的整体已刷盘物理偏移量)大于等于下一个刷盘点位,则表示该位置的数据已经刷盘成功了,不再需要刷盘,此时刷盘0次。 2. 如果小于下一个刷盘点位,则调用**mappedFileQueue.flush(0)方法进行一次同步刷盘,并且再次判断flushedWhere**是否大于等于下一个刷盘点位,如果是,则不再刷盘,此时刷盘1次。 3. 如果再次判断**flushedWhere**仍然小于下一个刷盘点位,那么再次刷盘。因为文件是固定大小的,第一次刷盘时可能出现上一个文件剩余大小不足的情况,消息只能再一次刷到下一个文件中,因此最多会出现两次刷盘的情况。
2、 调用wakeupCustomer 方法,实际上内部调用flushOKFuture.complete 方法存入结果,将唤醒因为提交同步刷盘请求而被阻塞的线程;3、 刷盘结束之后,将会修改StoreCheckpoint 中的physicMsgTimestamp (最新commitlog文件的刷盘时间戳,单位毫秒),用于重启数据恢复;4、 最后为requestsRead 重新创建一个空的队列,从这里可以得知,当下一次交换队列的时候,requestsWrite 又会成为一个空队列;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 private void doCommit () { if (!this .requestsRead.isEmpty()) { for (GroupCommitRequest req : this .requestsRead) { boolean flushOK = CommitLog.this .mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); for (int i = 0 ; i < 2 && !flushOK; i++) { CommitLog.this .mappedFileQueue.flush(0 ); flushOK = CommitLog.this .mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } long storeTimestamp = CommitLog.this .mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0 ) { CommitLog.this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } this .requestsRead = new LinkedList <>(); } else { CommitLog.this .mappedFileQueue.flush(0 ); } }
3.2 putRequest存入请求 调用该方法将加锁并将刷盘请求存入requestsWrite 集合,然后调用wakeup 方法唤醒同步刷盘线程。
这就是submitFlushRequest 方法中执行同步刷盘操作的调用点,仅仅需要将请求存入队列,同步刷盘服务线程将会自动回去这些请求并处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public synchronized void putRequest (final GroupCommitRequest request) { lock.lock(); try { this .requestsWrite.add(request); } finally { lock.unlock(); } this .wakeup(); }
3.2.1 Wakeup唤醒刷盘线程 wakeup方法尝试唤醒同步刷盘线程,表示有新的同步等待刷盘请求被提交。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void wakeup () { if (hasNotified.compareAndSet(false , true )) { waitPoint.countDown(); } }
3.3 双队列读写分离设计 **在同步刷盘服务中,有两个队列requestsWrite和requestsRead,requestsWrite用于存放putRequest方法写入的刷盘请求,requestsRead用于存放doCommit方法读取的刷盘请求。 **
*同步刷盘请求会首先调用putRequest方法存入requestsWrite队列中,而同步刷盘服务会最多每隔10ms就会调用swapRequests方法进行读写队列引用的交换,即requestsWrite指向原requestsRead指向的队列,requestsRead指向原requestsWrite指向的队列。并且putRequest方法和swapRequests方法会竞争同一把锁。 **
*在swapRequests方法之后的doCommit刷盘方法中,只会获取requestsRead中的刷盘请求进行刷盘,并且在刷盘的最后会将requestsRead队列重新构建一个空队列,而此过程中的刷盘请求都被提交到requestsWrite。 **
*从以上的流程中我们可以得知,调用一次doCommit刷盘方法,可以进行多个请求的批量刷盘。这里使用两个队列实现读写分离,以及重置队列的操作,可以使得putRequest方法提交刷盘请求与doCommit方法消费刷盘请求同时进行,避免了他们的锁竞争。而在此前版本的实现中,doCommit方法被加上了锁,将会影响刷盘性能。 **
4 FlushRealTimeService异步刷盘 异步刷盘服务为FlushRealTimeService,其同样是一个线程任务,并且内部持有一个单独的线程。
4.1 run异步刷盘 该方法中,将会在死循环中不断的执行刷盘的操作,实际上逻辑相比于同步刷盘更加简单,也没有什么读写分离,大概步骤为:
1、 获取一系列的配置参数:;
1、 是否是定时刷盘,默认是false,即不开启,可通过flushCommitLogTimed 配置;2、 获取刷盘间隔时间,默认500ms,可通过flushIntervalCommitLog 配置;3、 获取刷盘的最少页数,默认4,即16k,可通过flushCommitLogLeastPages 配置;4、 最长刷盘延迟间隔时间,默认10s,可通过flushCommitLogThoroughInterval 配置,即距离上一次刷盘超过10S时,不管页数是否超过4,都会刷盘;2、 如果当前时间距离上次刷盘时间大于等于10s ,那么必定刷盘,因此设置刷盘的最少页数为0,更新刷盘时间戳为当前时间;3、 判断是否是定时刷盘,如果定时刷盘,那么当前线程sleep睡眠指定的间隔时间,否则那么调用waitForRunning 方法,线程最多阻塞指定的间隔时间,但可以被中途的wakeup方法唤醒进而直接尝试进行刷盘;4、 线程醒来后调用mappedFileQueue.flush 方法刷盘,指定最少页数,随后更新最新commitlog文件的刷盘时间戳,单位毫秒,用于启动恢复;5、 当刷盘服务被关闭时,默认执行10次 刷盘操作,让消息尽量少丢失;
可以看到,异步刷盘的情况下,默认最少需要4页的脏数据 才会刷盘,另外还可以配置定时刷盘策略,默认500ms ,且最长刷盘延迟间隔时间,默认达到了10s 。这些延迟刷盘的配置,可以保证RocketMQ有尽可能更高的效率,但是同样会增加消息丢失的可能,例如机器掉电。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 public void run () { CommitLog.log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { boolean flushCommitLogTimed = CommitLog.this .defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); int interval = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); int flushPhysicQueueLeastPages = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int flushPhysicQueueThoroughInterval = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false ; long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this .lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this .lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0 ; printFlushProgress = (printTimes++ % 10 ) == 0 ; } try { if (flushCommitLogTimed) { Thread.sleep(interval); } else { this .waitForRunning(interval); } if (printFlushProgress) { this .printFlushProgress(); } long begin = System.currentTimeMillis(); CommitLog.this .mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this .mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0 ) { CommitLog.this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500 ) { log.info("Flush data to disk costs {} ms" , past); } } catch (Throwable e) { CommitLog.log.warn(this .getServiceName() + " service has exception. " , e); this .printFlushProgress(); } } boolean result = false ; for (int i = 0 ; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this .mappedFileQueue.flush(0 ); CommitLog.log.info(this .getServiceName() + " service shutdown, retry " + (i + 1 ) + " times " + (result ? "OK" : "Not OK" )); } this .printFlushProgress(); CommitLog.log.info(this .getServiceName() + " service end" ); }
5 CommitRealTimeService异步堆外缓存刷盘 异步堆外缓存刷盘服务为CommitRealTimeService ,其同样是一个线程任务,并且内部持有一个单独的线程。
5.1 run异步堆外缓存刷盘 该方法中,将会在死循环中不断的执行刷盘的操作,大概步骤为:
1、 获取一系列的配置参数:;
1、 获取刷盘间隔时间,默认200ms ,可通过commitIntervalCommitLog 配置;2、 获取刷盘的最少页数,默认4 ,即16k ,可通过commitCommitLogLeastPages 配置;3、 最长刷盘延迟间隔时间,默认200ms ,可通过commitCommitLogThoroughInterval 配置,即距离上一次刷盘超过200ms时,不管页数是否超过4,都会刷盘;2、 如果当前时间距离上次刷盘时间大于等于200ms ,那么必定刷盘,因此设置刷盘的最少页数为0,更新刷盘时间戳为当前时间;3、 调用mappedFileQueue.commit 方法提交数据到fileChannel,而不是直接flush ,如果已经提交了一些脏数据到fileChannel,那么更新最后提交的时间戳,并且唤醒flushCommitLogService 异步刷盘服务进行真正的刷盘操作;4、 调用waitForRunning 方法,线程最多阻塞指定的间隔时间,但可以被中途的wakeup方法唤醒进而进行下一轮循环;5、 当刷盘服务被关闭时,默认执行10次 刷盘(提交)操作,让消息尽量少丢失;
可以发现,异步堆外缓存刷盘和普通异步刷盘的逻辑都差不多,最主要的区别就是异步堆外缓存刷盘服务并不会真正的执行flush刷盘,而是调用commit方法提交数据到fileChannel。
开启了异步堆外缓存服务之后,消息会先被追加到堆外内存writebuffer,然后异步(每最多200ms执行一次)的提交到commitLog文件的文件通道FileChannel中,然后唤醒异步刷盘服务FlushRealTimeService,由该FlushRealTimeService服务(每最多500ms执行一次)最终异步的将MappedByteBuffer中的数据刷到磁盘。
开启了异步堆外缓存服务之后,写数据的时候写入堆外缓存writeBuffer中,而读取数据始终从MappedByteBuffer中读取,二者通过异步堆外缓存刷盘服务CommitRealTimeService实现数据同步,该服务异步(最多200ms执行一次)的将堆外缓存writeBuffer中的脏数据提交到commitLog文件的文件通道FileChannel中,而该文件被执行了内存映射mmap操作,因此可以从对应的MappedByteBuffer中直接获取提交到FileChannel的数据,但仍有延迟。
高并发下频繁写入 page cache 可能会造成刷脏页时磁盘压力较高,导致写入时出现毛刺现象。读写分离能缓解频繁写page cache 的压力,但会增加消息不一致的风险,使得数据一致性降低到最低。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 @Override public void run () { CommitLog.log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { int interval = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); int commitDataLeastPages = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); int commitDataThoroughInterval = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); if (begin >= (this .lastCommitTimestamp + commitDataThoroughInterval)) { this .lastCommitTimestamp = begin; commitDataLeastPages = 0 ; } try { boolean result = CommitLog.this .mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this .lastCommitTimestamp = end; flushCommitLogService.wakeup(); } if (end - begin > 500 ) { log.info("Commit data to file costs {} ms" , end - begin); } this .waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this .getServiceName() + " service has exception. " , e); } } boolean result = false ; for (int i = 0 ; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this .mappedFileQueue.commit(0 ); CommitLog.log.info(this .getServiceName() + " service shutdown, retry " + (i + 1 ) + " times " + (result ? "OK" : "Not OK" )); } CommitLog.log.info(this .getServiceName() + " service end" ); }
6 MappedFile的刷盘 6.1 MappedFileQueue#flush刷盘 同步和异步刷盘服务,最终都是调用MappedFileQueue#flush 方法执行刷盘。
该方法首先根据最新刷盘物理位置flushedWhere ,去找到对应的MappedFile。如果flushedWhere为0,表示还没有开始写消息,则获取第一个MappedFile。然后调用mappedFile#flush 方法执行真正的刷盘操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public boolean flush (final int flushLeastPages) { boolean result = true ; MappedFile mappedFile = this .findMappedFileByOffset(this .flushedWhere, this .flushedWhere == 0 ); if (mappedFile != null ) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this .flushedWhere; this .flushedWhere = where; if (0 == flushLeastPages) { this .storeTimestamp = tmpTimeStamp; } } return result; }
6.1.1 findMappedFileByOffset根据偏移量获取MappedFile 该方法用于根据偏移量获取对应的MappedFile,实际上很简单,大概步骤为:
1、 获取mappedFiles集合中的第一个MappedFile和最后一个MappedFile;2、 获取当前offset属于的MappedFile在mappedFiles集合中的索引位置因为MappedFile的名字则是该MappedFile的起始offset,而每个MappedFile的大小一般是固定的,因此查找的方法很简单:intindex=( int)((offset/this.mappedFileSize)-(firstMappedFile.getFileFromOffset()/this.mappedFileSize));3、 根据索引位置从mappedFiles中获取对应的MappedFile文件targetFile,如果指定offset在targetFile的offset范围内,那么返回该targetFile;4、 否则,遍历mappedFiles,依次对每个MappedFile的offset范围进行判断,找到对应的tmpMappedFile并返回;5、 到这里,表示没找到任何MappedFile,如果returnFirstOnNotFound为true,则返回第一个文件;6、 最后,还是不满足条件,返回null;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public MappedFile findMappedFileByOffset (final long offset, final boolean returnFirstOnNotFound) { try { MappedFile firstMappedFile = this .getFirstMappedFile(); MappedFile lastMappedFile = this .getLastMappedFile(); if (firstMappedFile != null && lastMappedFile != null ) { if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this .mappedFileSize) { LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}" , offset, firstMappedFile.getFileFromOffset(), lastMappedFile.getFileFromOffset() + this .mappedFileSize, this .mappedFileSize, this .mappedFiles.size()); } else { int index = (int ) ((offset / this .mappedFileSize) - (firstMappedFile.getFileFromOffset() / this .mappedFileSize)); MappedFile targetFile = null ; try { targetFile = this .mappedFiles.get(index); } catch (Exception ignored) { } if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this .mappedFileSize) { return targetFile; } for (MappedFile tmpMappedFile : this .mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this .mappedFileSize) { return tmpMappedFile; } } } if (returnFirstOnNotFound) { return firstMappedFile; } } } catch (Exception e) { log.error("findMappedFileByOffset Exception" , e); } return null ; }
6.1.2 MappedFile#flush执行刷盘 该方法是需要执行刷盘的MappedFile实例调用的方法,用于完成刷盘操作。无论是同步还是异步刷盘,最终都是调用该方法。
大概步骤为:
1、 判断是否可以刷盘如果文件已经满了,或者如果flushLeastPages 大于0,且脏页数量大于等于flushLeastPages ,或者如果flushLeastPages等于0并且存在脏数据,这几种情况都会刷盘;
如果可以刷盘。那么增加引用次数,并且进行刷盘操作,如果使用了堆外内存,那么通过fileChannel#force 强制刷盘,这是异步堆外内存走的逻辑。如果没有使用堆外内存,那么通过mappedByteBuffer#force 强制刷盘,这是同步或者异步刷盘走的逻辑。
3、 最后更新刷盘位置为写入位置,并返回;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public int flush (final int flushLeastPages) { if (this .isAbleToFlush(flushLeastPages)) { if (this .hold()) { int value = getReadPosition(); try { if (writeBuffer != null || this .fileChannel.position() != 0 ) { this .fileChannel.force(false ); } else { this .mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk." , e); } this .flushedPosition.set(value); this .release(); } else { log.warn("in flush, hold failed, flush offset = " + this .flushedPosition.get()); this .flushedPosition.set(getReadPosition()); } } return this .getFlushedPosition(); }
6.1.2.1 isAbleToFlush是否可刷盘 该方法用于判断当前是否可刷盘,大概流程为:
1、 首先获取刷盘位置flush和写入位置write然后判断如果文件满了,即写入位置等于文件大小,那么直接返回true;2、 如果至少刷盘的页数flushLeastPages 大于0,则需要比较写入位置与刷盘位置的差值,当差值大于等于指定的最少页数才能刷盘,这样可以防止频繁的刷盘;3、 否则,表示flushLeastPages 为0,那么只要写入位置大于刷盘位置,即存在脏数据,那么就一定会刷盘;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private boolean isAbleToFlush (final int flushLeastPages) { int flush = this .flushedPosition.get(); int write = getReadPosition(); if (this .isFull()) { return true ; } if (flushLeastPages > 0 ) { return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; } return write > flush; }
6.2 MappedFileQueue#commit提交 MappedFileQueue#commit 方法用于提交刷盘,该方法首先根据最新刷盘物理位置flushedWhere,去找到对应的MappedFile。如果flushedWhere为0,表示还没有开始写消息,则获取第一个MappedFile。然后调用mappedFile#flush 方法执行真正的刷盘操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public boolean commit (final int commitLeastPages) { boolean result = true ; MappedFile mappedFile = this .findMappedFileByOffset(this .committedWhere, this .committedWhere == 0 ); if (mappedFile != null ) { int offset = mappedFile.commit(commitLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this .committedWhere; this .committedWhere = where; } return result; }
6.2.1 mappedFile#commit提交 该方法是需要执行提交的MappedFile实例调用的方法,用于完成提交操作。
通过isAbleToCommit 方法判断是否支持提交,其判断逻辑和isAbleToFlush 方法一致。如果支持提交,那么调用commit0 方法将堆外内存中的全部脏数据提交到filechannel。
如果所有的脏数据被提交到了FileChannel ,那么归还堆外缓存,将堆外缓存重置,并存入内存池availableBuffers 的头部,随后将writeBuffer职位null,下次再重新获取新的writeBuffer。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public int commit (final int commitLeastPages) { if (writeBuffer == null ) { return this .wrotePosition.get(); } if (this .isAbleToCommit(commitLeastPages)) { if (this .hold()) { commit0(); this .release(); } else { log.warn("in commit, hold failed, commit offset = " + this .committedPosition.get()); } } if (writeBuffer != null && this .transientStorePool != null && this .fileSize == this .committedPosition.get()) { this .transientStorePool.returnBuffer(writeBuffer); this .writeBuffer = null ; } return this .committedPosition.get(); }
7 总结 本次我们学习了RocketMQ消息刷盘的源码, 有四种刷盘策略:
1、 如果是同步刷盘 ,那么获取同步刷盘服务GroupCommitService:;
2、 同步等待 :如果消息的配置需要等待存储完成后才返回,那么构建同步刷盘请求,并且将请求存入内部的requestsWrite,并且唤醒同步刷盘线程,然后仅仅返回future,没有填充刷盘结果,将会在外部thenCombine方法处阻塞等待这是同步刷盘的默认配置 ;3、 同步不等待 :如果消息的配置不需要等待存储完成后才返回,即不需要等待刷盘结果,那么唤醒同步刷盘线程就可以了,随后直接返回PUT_OK;4、 如果是异步刷盘 :;
1、 如果启动了堆外缓存读写分离 ,即transientStorePoolEnable 为true 并且不是SLAVE ,那么唤醒异步转存服务* *CommitRealTimeService;2、 如果 没有启动堆外缓存,那么唤醒异步刷盘服务 FlushRealTimeService这是 异步刷盘的默认配置**;
相关的知识点:
同步和异步刷盘服务,最终都是调用MappedFileQueue#flush 方法执行刷盘,该方法内部最终又是通过mappedFile#flush 方法刷盘的。2、 同步刷盘双队列读写分离优化: ;
1、 在同步刷盘的时候,采用了双队列读写分离的机制 ,以及重置队列的操作,可以使得putRequest方法提交刷盘请求与doCommit方法消费刷盘请求同时进行,避免了他们的锁竞争而在此前版本的实现中,doCommit方法被加上了锁,将会影响刷盘性能;3、 异步堆外缓存刷盘优化: ;
1、 在异步刷盘的时候,可以开启异步堆外缓存刷盘 机制,异步堆外缓存刷盘服务并不会真正的执行flush刷盘,而是调用commit方法提交数据到fileChannel;2、 开启了异步堆外缓存服务之后,写数据的时候写入堆外缓存writeBuffer中,而读取数据始终从MappedByteBuffer中读取,*这也是一种读写分离机制 * 二者通过异步堆外缓存刷盘服务CommitRealTimeService实现数据同步,该服务异步(最多200ms执行一次)的将堆外缓存writeBuffer中的脏数据提交到commitLog文件的文件通道FileChannel中,而该文件被执行了内存映射mmap操作,因此可以从对应的MappedByteBuffer中直接获取提交到FileChannel的数据,但仍有延迟;3、 **高并发下频繁写入pagecache可能会造成刷脏页时磁盘压力较高,导致写入时出现毛刺现象读写分离能缓解频繁写pagecache的压力,但会增加消息不一致的风险,使得数据一致性降低到最低 **;
本次我们只学习了消息到commitFile文件的刷盘机制,但是没有讲解ConsumeQueue和IndexFile文件的构建机制,我们下次学习。
__END__