04、RocketMQ源码分析:Broker启动加载消息文件以及恢复数据源码【一万字】 此前我们学习了Broker的启动源码:RocketMQ源码(3)—Broker启动流程源码解析【一万字】 ,Broker的启动过程中,在DefaultMessageStore实例化之后,将会调用load方法将磁盘中的commitLog、ConsumeQueue、IndexFile文件的数据加载到内存中,还会进行数据恢复操作。
1 isTempFileExist是否存在临时文件
2 commitLog#load加载消息日志文件
2.2 commitlog文件简介
3 loadConsumeQueue加载消费队列文件
3.2 ConsumeQueue文件简介
4 创建StoreCheckpoint检查点对象
5 加载index索引文件
6 恢复CommitLog和ConsumeQueue数据
6.1 recoverConsumeQueue恢复ConsumeQueue
6.1.2 truncateDirtyFiles截断无效文件
6.2 recoverNormally正常恢复commitLog
6.2.1 truncateDirtyLogicFiles截断无效consumequeue文件
6.3 recoverAbnormally异常恢复commitlog
6.3.1 isMappedFileMatchedRecover文件是否正常
6.4 recoverTopicQueueTable恢复consumeQueueTable
1、 调用isTempFileExist方法判断上次broker是否是正常退出,如果是正常退出不会保留abort文件,异常退出则会;2、 加载CommitLog日志文件CommitLog文件是真正存储消息内容的地方;3、 加载ConsumeQueue文件ConsumeQueue文件可以看作是CommitLog的消息偏移量索引文件;4、 加载index索引文件Index文件可以看作是CommitLog的消息时间范围索引文件;5、 恢复ConsumeQueue文件和CommitLog文件,将正确的的数据恢复至内存中,删除错误数据和文件;6、 加载RocketMQ延迟消息的服务,包括延时等级、配置文件等等;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 public boolean load () { boolean result = true ; try { boolean lastExitOK = !this .isTempFileExist(); log.info("last shutdown {}" , lastExitOK ? "normally" : "abnormally" ); result = result && this .commitLog.load(); result = result && this .loadConsumeQueue(); if (result) { this .storeCheckpoint = new StoreCheckpoint (StorePathConfigHelper.getStoreCheckpoint(this .messageStoreConfig.getStorePathRootDir())); this .indexService.load(lastExitOK); this .recover(lastExitOK); log.info("load over, and the max phy offset = {}" , this .getMaxPhyOffset()); if (null != scheduleMessageService) { result = this .scheduleMessageService.load(); } } } catch (Exception e) { log.error("load exception" , e); result = false ; } if (!result) { this .allocateMappedFileService.shutdown(); } return result; }
1 isTempFileExist是否存在临时文件 首先会判断是否存在临时文件,也就是abort文件,其路径为{storePathRootDir}/abort,Broker在启动时会创建{ROCKET_HOME}/store/abort文件,并且注册钩子函数:在JVM退出时删除abort文件。
1 2 3 4 5 6 7 8 9 10 private boolean isTempFileExist () { String fileName = StorePathConfigHelper.getAbortFile(this .messageStoreConfig.getStorePathRootDir()); File file = new File (fileName); return file.exists(); }
2 commitLog#load加载消息日志文件 通过内部的CommitLog对象的load方法加载Commit Log日志文件,目录路径取自broker.conf文件中配置的storePathCommitLog属性,默认为$HOME/store/commitlog/。
1 2 3 4 5 6 7 8 9 10 11 public boolean load () { boolean result = this .mappedFileQueue.load(); log.info("load commit log " + (result ? "OK" : "Failed" )); return result; }
2.1 load加载文件 MappedFileQueue#load方法会就是将commitLog目录路径下的commotlog文件进行全部的加载为MappedFile对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public boolean load () { File dir = new File (this .storePath); File[] ls = dir.listFiles(); if (ls != null ) { return doLoad(Arrays.asList(ls)); } return true ; } public boolean doLoad (List<File> files) { files.sort(Comparator.comparing(File::getName)); for (File file : files) { if (file.length() != this .mappedFileSize) { log.warn(file + "\t" + file.length() + " length not matched message store config value, please check it manually" ); return false ; } try { MappedFile mappedFile = new MappedFile (file.getPath(), mappedFileSize); mappedFile.setWrotePosition(this .mappedFileSize); mappedFile.setFlushedPosition(this .mappedFileSize); mappedFile.setCommittedPosition(this .mappedFileSize); this .mappedFiles.add(mappedFile); log.info("load " + file.getPath() + " OK" ); } catch (IOException e) { log.error("load file " + file + " error" , e); return false ; } } return true ; }
2.1.1 创建MappedFile并映射文件 MappedFile作为一个RocketMQ的物理文件在Java中的映射类。commitLog consumerQueue、indexFile3种文件磁盘的读写都是通过MappedFile操作的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public MappedFile (final String fileName, final int fileSize) throws IOException { init(fileName, fileSize); } private void init (final String fileName, final int fileSize) throws IOException { this .fileName = fileName; this .fileSize = fileSize; this .file = new File (fileName); this .fileFromOffset = Long.parseLong(this .file.getName()); boolean ok = false ; ensureDirOK(this .file.getParent()); try { this .fileChannel = new RandomAccessFile (this .file, "rw" ).getChannel(); this .mappedByteBuffer = this .fileChannel.map(MapMode.READ_WRITE, 0 , fileSize); TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet(); ok = true ; } catch (FileNotFoundException e) { log.error("Failed to create file " + this .fileName, e); throw e; } catch (IOException e) { log.error("Failed to map file " + this .fileName, e); throw e; } finally { if (!ok && this .fileChannel != null ) { this .fileChannel.close(); } } }
2.2 commitlog文件简介 Commit Log文件是RocketMQ真正存储消息内容的地方,即消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。
3 loadConsumeQueue加载消费队列文件 该方法用于加载消费队列文件,ConsumeQueue文件可以看作是CommitLog的索引文件,其存储了它所属Topic的消息在Commit Log中的偏移量。消费者拉取消息的时候,可以从Consume Queue中快速的根据偏移量定位消息在Commit Log中的位置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 private boolean loadConsumeQueue () { File dirLogic = new File (StorePathConfigHelper.getStorePathConsumeQueue(this .messageStoreConfig.getStorePathRootDir())); File[] fileTopicList = dirLogic.listFiles(); if (fileTopicList != null ) { for (File fileTopic : fileTopicList) { String topic = fileTopic.getName(); File[] fileQueueIdList = fileTopic.listFiles(); if (fileQueueIdList != null ) { for (File fileQueueId : fileQueueIdList) { int queueId; try { queueId = Integer.parseInt(fileQueueId.getName()); } catch (NumberFormatException e) { continue ; } ConsumeQueue logic = new ConsumeQueue ( topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this .messageStoreConfig.getStorePathRootDir()), this .getMessageStoreConfig().getMappedFileSizeConsumeQueue(), this ); this .putConsumeQueue(topic, queueId, logic); if (!logic.load()) { return false ; } } } } }
3.1 load加载ConsumeQueue文件 ConsumeQueue对象建立之后,会对自己管理的队列id目录下面的ConsumeQueue文件进行加载。内部就是调用mappedFileQueue的load方法,该方法我们前面讲过了,会对每个ConsumeQueue文件床创建一个MappedFile对象并且进行内存映射mmap操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean load () { boolean result = this .mappedFileQueue.load(); log.info("load consume queue " + this .topic + "-" + this .queueId + " " + (result ? "OK" : "Failed" )); if (isExtReadEnable()) { result &= this .consumeQueueExt.load(); } return result; }
3.2 ConsumeQueue文件简介 官方描述如下:消息消费队列(可以理解为Topic中的队列),引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。
同样consumequeue文件中的条目采取定长设计,每个条目共20字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。
我们使用RocketMQ下面的example包下面的快速案例生产1000个消息之后,可以看到consumequeue目录下面产生了预期的Consume Queue文件结构,便于理解:
4 创建StoreCheckpoint检查点对象 在commitlog和consumequeue文件都加载成功之后,加载checkpoint 检查点文件,创建storeCheckpoint对象,文件位置是{storePathRootDir}/checkpoint。
1、 physicMsgTimestamp:最新commitlog文件的刷盘时间戳,单位毫秒;2、 logicsMsgTimestamp:最新consumeQueue文件的刷盘时间戳,单位毫秒;3、 indexMsgTimestamp:创建最新indexfile文件的时间戳,单位毫秒;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public StoreCheckpoint (final String scpPath) throws IOException { File file = new File (scpPath); MappedFile.ensureDirOK(file.getParent()); boolean fileExists = file.exists(); this .randomAccessFile = new RandomAccessFile (file, "rw" ); this .fileChannel = this .randomAccessFile.getChannel(); this .mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0 , MappedFile.OS_PAGE_SIZE); if (fileExists) { log.info("store checkpoint file exists, " + scpPath); this .physicMsgTimestamp = this .mappedByteBuffer.getLong(0 ); this .logicsMsgTimestamp = this .mappedByteBuffer.getLong(8 ); this .indexMsgTimestamp = this .mappedByteBuffer.getLong(16 ); log.info("store checkpoint file physicMsgTimestamp " + this .physicMsgTimestamp + ", " + UtilAll.timeMillisToHumanString(this .physicMsgTimestamp)); log.info("store checkpoint file logicsMsgTimestamp " + this .logicsMsgTimestamp + ", " + UtilAll.timeMillisToHumanString(this .logicsMsgTimestamp)); log.info("store checkpoint file indexMsgTimestamp " + this .indexMsgTimestamp + ", " + UtilAll.timeMillisToHumanString(this .indexMsgTimestamp)); } else { log.info("store checkpoint file not exists, " + scpPath); } }
5 加载index索引文件 加载index 索引文件,目录路径为{storePathRootDir}/index。index 索引文件用于通过时间区间来快速查询消息,底层为HashMap结构,实现为hash索引。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public boolean load (final boolean lastExitOK) { File dir = new File (this .storePath); File[] files = dir.listFiles(); if (files != null ) { Arrays.sort(files); for (File file : files) { try { IndexFile f = new IndexFile (file.getPath(), this .hashSlotNum, this .indexNum, 0 , 0 ); f.load(); if (!lastExitOK) { if (f.getEndTimestamp() > this .defaultMessageStore.getStoreCheckpoint() .getIndexMsgTimestamp()) { f.destroy(0 ); continue ; } } log.info("load index file OK, " + f.getFileName()); this .indexFileList.add(f); } catch (IOException e) { log.error("load file {} error" , file, e); return false ; } catch (NumberFormatException e) { log.error("load file {} error" , file, e); } } } return true ; }
5.1 index索引文件简介 官方介绍:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是: H O M E / s t o r e / i n d e x HOME/store/index HOME/store/index{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
6 恢复CommitLog和ConsumeQueue数据 该方法用于恢复CommitLog和ConsumeQueue数据到内存中,:
1、 首先恢复所有的ConsumeQueue文件,返回在ConsumeQueue有效区域存储的最大的commitlog偏移量;2、 随后对于commitlog文件进行恢复,根据上次broker是否正常退出,有正常恢复和异常恢复的选择;3、 最后再对topicQueueTable进行恢复;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private void recover (final boolean lastExitOK) { long maxPhyOffsetOfConsumeQueue = this .recoverConsumeQueue(); if (lastExitOK) { this .commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); } else { this .commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } this .recoverTopicQueueTable(); }
6.1 recoverConsumeQueue恢复ConsumeQueue recoverConsumeQueue方法用于恢复ConsumeQueue文件,并且删除无效的ConsumeQueue文件,最后会返回在ConsumeQueue有效区域存储的最大的commitlog物理偏移量,该值表示消息在commitlog文件中最后写完的指针,即commitlog中的有效消息数据最大文件偏移量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private long recoverConsumeQueue () { long maxPhysicOffset = -1 ; for (ConcurrentMap<Integer, ConsumeQueue> maps : this .consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { logic.recover(); if (logic.getMaxPhysicOffset() > maxPhysicOffset) { maxPhysicOffset = logic.getMaxPhysicOffset(); } } } return maxPhysicOffset; }
6.1.1recover恢复ConsumeQueue 该方法属于ConsumeQueue,用于恢复每一个ConsumeQueue,我们之前说过,一个队列id目录对应着一个ConsumeQueue对象,因此ConsumeQueue内部保存着多个属于同一queueId的ConsumeQueue文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 public void recover () { final List<MappedFile> mappedFiles = this .mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { int index = mappedFiles.size() - 3 ; if (index < 0 ) index = 0 ; int mappedFileSizeLogics = this .mappedFileSize; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0 ; long maxExtAddr = 1 ; while (true ) { for (int i = 0 ; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) { long offset = byteBuffer.getLong(); int size = byteBuffer.getInt(); long tagsCode = byteBuffer.getLong(); if (offset >= 0 && size > 0 ) { mappedFileOffset = i + CQ_STORE_UNIT_SIZE; this .maxPhysicOffset = offset + size; if (isExtAddr(tagsCode)) { maxExtAddr = tagsCode; } } else { log.info("recover current consume queue file over, " + mappedFile.getFileName() + " " + offset + " " + size + " " + tagsCode); break ; } } if (mappedFileOffset == mappedFileSizeLogics) { index++; if (index >= mappedFiles.size()) { log.info("recover last consume queue file over, last mapped file " + mappedFile.getFileName()); break ; } else { mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0 ; log.info("recover next consume queue file, " + mappedFile.getFileName()); } } else { log.info("recover current consume queue queue over " + mappedFile.getFileName() + " " + (processOffset + mappedFileOffset)); break ; } } processOffset += mappedFileOffset; this .mappedFileQueue.setFlushedWhere(processOffset); this .mappedFileQueue.setCommittedWhere(processOffset); this .mappedFileQueue.truncateDirtyFiles(processOffset); if (isExtReadEnable()) { this .consumeQueueExt.recover(); log.info("Truncate consume queue extend file by max {}" , maxExtAddr); this .consumeQueueExt.truncateByMaxAddress(maxExtAddr); } } }
6.1.2 truncateDirtyFiles截断无效文件 除了consumequeue文件之外,在对commitlog文件进行恢复的时候也会调用该方法。
1、 那么将文件起始偏移量大于最大有效数据偏移量的文件进行整个删除;2、 否则设置该文件的有效数据位置为最大有效数据偏移量;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 public void truncateDirtyFiles (long offset) { List<MappedFile> willRemoveFiles = new ArrayList <MappedFile>(); for (MappedFile file : this .mappedFiles) { long fileTailOffset = file.getFileFromOffset() + this .mappedFileSize; if (fileTailOffset > offset) { if (offset >= file.getFileFromOffset()) { file.setWrotePosition((int ) (offset % this .mappedFileSize)); file.setCommittedPosition((int ) (offset % this .mappedFileSize)); file.setFlushedPosition((int ) (offset % this .mappedFileSize)); } else { file.destroy(1000 ); willRemoveFiles.add(file); } } } this .deleteExpiredFile(willRemoveFiles); } void deleteExpiredFile (List<MappedFile> files) { if (!files.isEmpty()) { Iterator<MappedFile> iterator = files.iterator(); while (iterator.hasNext()) { MappedFile cur = iterator.next(); if (!this .mappedFiles.contains(cur)) { iterator.remove(); log.info("This mappedFile {} is not contained by mappedFiles, so skip it." , cur.getFileName()); } } try { if (!this .mappedFiles.removeAll(files)) { log.error("deleteExpiredFile remove failed." ); } } catch (Exception e) { log.error("deleteExpiredFile has exception." , e); } } }
6.2 recoverNormally正常恢复commitLog 该方法用于Broker上次正常关闭的时候恢复commitlog,其逻辑与recoverConsumeQueue恢复ConsumeQueue文件的方法差不多。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 public void recoverNormally (long maxPhyOffsetOfConsumeQueue) { boolean checkCRCOnRecover = this .defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this .mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { int index = mappedFiles.size() - 3 ; if (index < 0 ) index = 0 ; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0 ; while (true ) { DispatchRequest dispatchRequest = this .checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess() && size > 0 ) { mappedFileOffset += size; } else if (dispatchRequest.isSuccess() && size == 0 ) { index++; if (index >= mappedFiles.size()) { log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName()); break ; } else { mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0 ; log.info("recover next physics file, " + mappedFile.getFileName()); } } else if (!dispatchRequest.isSuccess()) { log.info("recover physics file end, " + mappedFile.getFileName()); break ; } } processOffset += mappedFileOffset; this .mappedFileQueue.setFlushedWhere(processOffset); this .mappedFileQueue.setCommittedWhere(processOffset); this .mappedFileQueue.truncateDirtyFiles(processOffset); if (maxPhyOffsetOfConsumeQueue >= processOffset) { log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files" , maxPhyOffsetOfConsumeQueue, processOffset); this .defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } else { log.warn("The commitlog files are deleted, and delete the consume queue files" ); this .mappedFileQueue.setFlushedWhere(0 ); this .mappedFileQueue.setCommittedWhere(0 ); this .defaultMessageStore.destroyLogics(); } }
6.2.1 truncateDirtyLogicFiles截断无效consumequeue文件 在最后,会将此前recoverConsumeQueue方法恢复ConsumeQueue时从consumequeue文件中获取的最大有效的commitlog物理偏移量maxPhyOffsetOfConsumeQueue和当前方法从commitlog文件本身中找到的最大有效偏移量对比。
如果consumequeue文件记录的最大有效commitlog文件偏移量 大于等于 commitlog文件本身记录的最大有效区域的偏移量。那么以commitlog文件的有效数据为准,再次清除consumequeue文件中的脏数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void truncateDirtyLogicFiles (long phyOffset) { ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this .consumeQueueTable; for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { logic.truncateDirtyLogicFiles(phyOffset); } } }
6.3 recoverAbnormally异常恢复commitlog 该方法用于Broker上次异常关闭的时候恢复commitlog,其逻辑与commitlog文件的正常恢复的方法recoverNormally有些许区别,但是中的核心逻辑都是一样的。
1、 首先倒序遍历并通过调用isMappedFileMatchedRecover方法判断当前文件是否是一个正常的commitlog文件包括文件魔数的校验、文件消息存盘时间校验、StoreCheckpoint校验等如果找到一个正确的commitlog文件,则停止遍历;2、 然后从第一个正确的commitlog文件开始向后遍历、恢复commitlog如果某个消息是正常的,那么通过defaultMessageStore.doDispatch方法调用CommitLogDispatcher重新构建当前消息的indexfile索引和consumequeue索引;3、 恢复完毕之后的代码和commitlog文件正常恢复的流程是一样的例如删除文件最大有效数据偏移量processOffset之后的所有commitlog数据,清除consumequeue文件中的脏数据等等;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 @Deprecated public void recoverAbnormally (long maxPhyOffsetOfConsumeQueue) { boolean checkCRCOnRecover = this .defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this .mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { int index = mappedFiles.size() - 1 ; MappedFile mappedFile = null ; for (; index >= 0 ; index--) { mappedFile = mappedFiles.get(index); if (this .isMappedFileMatchedRecover(mappedFile)) { log.info("recover from this mapped file " + mappedFile.getFileName()); break ; } } if (index < 0 ) { index = 0 ; mappedFile = mappedFiles.get(index); } ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0 ; while (true ) { DispatchRequest dispatchRequest = this .checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0 ) { mappedFileOffset += size; if (this .defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { if (dispatchRequest.getCommitLogOffset() < this .defaultMessageStore.getConfirmOffset()) { this .defaultMessageStore.doDispatch(dispatchRequest); } } else { this .defaultMessageStore.doDispatch(dispatchRequest); } } else if (size == 0 ) { index++; if (index >= mappedFiles.size()) { log.info("recover physics file over, last mapped file " + mappedFile.getFileName()); break ; } else { mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0 ; log.info("recover next physics file, " + mappedFile.getFileName()); } } } else { log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position()); break ; } } processOffset += mappedFileOffset; this .mappedFileQueue.setFlushedWhere(processOffset); this .mappedFileQueue.setCommittedWhere(processOffset); this .mappedFileQueue.truncateDirtyFiles(processOffset); if (maxPhyOffsetOfConsumeQueue >= processOffset) { log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files" , maxPhyOffsetOfConsumeQueue, processOffset); this .defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } else { log.warn("The commitlog files are deleted, and delete the consume queue files" ); this .mappedFileQueue.setFlushedWhere(0 ); this .mappedFileQueue.setCommittedWhere(0 ); this .defaultMessageStore.destroyLogics(); } }
6.3.1 isMappedFileMatchedRecover文件是否正常 该方法判断当前文件是否是一个正常的commitlog文件。包括commitlog文件魔数的校验、文件消息存盘时间不为0的校验、存储时间小于等于检测点StoreCheckpoint的校验等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 private boolean isMappedFileMatchedRecover (final MappedFile mappedFile) { ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION); if (magicCode != MESSAGE_MAGIC_CODE) { return false ; } int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION); int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20 ; int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornhostLength; long storeTimestamp = byteBuffer.getLong(msgStoreTimePos); if (0 == storeTimestamp) { return false ; } if (this .defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() && this .defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { if (storeTimestamp <= this .defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) { log.info("find check timestamp, {} {}" , storeTimestamp, UtilAll.timeMillisToHumanString(storeTimestamp)); return true ; } } else { if (storeTimestamp <= this .defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) { log.info("find check timestamp, {} {}" , storeTimestamp, UtilAll.timeMillisToHumanString(storeTimestamp)); return true ; } } return false ; }
6.4 recoverTopicQueueTable恢复consumeQueueTable 在对consumequeue和commitlog进行恢复之后,之后会对consumeQueueTable进行恢复。topicQueueTable存储的是“topic-queueid”到当前queueId下面最大的相对偏移量的map。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void recoverTopicQueueTable () { HashMap<String, Long> table = new HashMap <String, Long>(1024 ); long minPhyOffset = this .commitLog.getMinOffset(); for (ConcurrentMap<Integer, ConsumeQueue> maps : this .consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { String key = logic.getTopic() + "-" + logic.getQueueId(); table.put(key, logic.getMaxOffsetInQueue()); logic.correctMinOffset(minPhyOffset); } } this .commitLog.setTopicQueueTable(table); }