11、RocketMQ源码分析:BrokerasyncPutMessage处理消息以及存储的高性能设计【一万字】 此前我们学习了RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic 的整体流程,从流程中我们知道asyncPutMessage方法真正的用来存储消息。现在我们来看看这个方法的源码。
文章目录
1 asyncPutMessage存储普通消息
1.1 checkStoreStatus检查存储状态
1.2 checkMessage检查消息
2 CommitLog#asyncPutMessage异步存储消息
2.2 获取最新mappedFile
2.1.1 tryCreateMappedFile创建新的MappedFile
2.1.2 putRequestAndReturnMappedFile异步创建MappedFile
2.1.3 AllocateMappedFileService创建MappedFile
2.1.4 mmapOperation执行mmap操作
2.1.4.1 采用mmap
2.1.4.2 采用堆外内存
2.1.5 warmMappedFile文件预热
2.1.6 mlock锁定内存
2.3 appendMessage追加存储消息
3 存储高性能设计总结
1 asyncPutMessage存储普通消息 普通消息的处理、存储入口方法是DefaultMessageStore#asyncPutMessage方法。
首先会调用checkStoreStatus、checkMessage、checkLmqMessage方法进行一系列的前置校验,如果通过了,则调用CommitLog#asyncPutMessage方法真正的存储消息,最后会更新耗费的时间或者失败次数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 @Override public CompletableFuture<PutMessageResult> asyncPutMessage (MessageExtBrokerInner msg) { PutMessageStatus checkStoreStatus = this .checkStoreStatus(); if (checkStoreStatus != PutMessageStatus.PUT_OK) { return CompletableFuture.completedFuture(new PutMessageResult (checkStoreStatus, null )); } PutMessageStatus msgCheckStatus = this .checkMessage(msg);if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { return CompletableFuture.completedFuture(new PutMessageResult (msgCheckStatus, null )); } PutMessageStatus lmqMsgCheckStatus = this .checkLmqMessage(msg);if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) { return CompletableFuture.completedFuture(new PutMessageResult (lmqMsgCheckStatus, null )); } long beginTime = this .getSystemClock().now();CompletableFuture<PutMessageResult> putResultFuture = this .commitLog.asyncPutMessage(msg); putResultFuture.thenAccept((result) -> { long elapsedTime = this .getSystemClock().now() - beginTime; if (elapsedTime > 500 ) { log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}" , elapsedTime, msg.getBody().length); } this .storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { this .storeStatsService.getPutMessageFailedTimes().add(1 ); } }); return putResultFuture;}
1.1 checkStoreStatus检查存储状态 首先就会检查消息存储状态,看是否支持写入消息:
1、 如果DefaultMessageStore是shutdown状态,返回SERVICE_NOT_AVAILABLE;2、 如果broker是SLAVE角色,则返回SERVICE_NOT_AVAILABLE,不能将消息写入SLAVE角色;3、 如果不支持写入,那么返回SERVICE_NOT_AVAILABLE,可能因为broker的磁盘已满、写入逻辑队列错误、写入索引文件错误等等原因;4、 如果操作系统页缓存繁忙,则返回OS_PAGECACHE_BUSY,如果broker持有锁的时间超过osPageCacheBusyTimeOutMills,则算作操作系统页缓存繁忙;5、 返回PUT_OK,表示可以存储消息;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 private PutMessageStatus checkStoreStatus () { if (this .shutdown) { log.warn("message store has shutdown, so putMessage is forbidden" ); return PutMessageStatus.SERVICE_NOT_AVAILABLE; } if (BrokerRole.SLAVE == this .messageStoreConfig.getBrokerRole()) { long value = this .printTimes.getAndIncrement(); if ((value % 50000 ) == 0 ) { log.warn("broke role is slave, so putMessage is forbidden" ); } return PutMessageStatus.SERVICE_NOT_AVAILABLE; } if (!this .runningFlags.isWriteable()) { long value = this .printTimes.getAndIncrement(); if ((value % 50000 ) == 0 ) { log.warn("the message store is not writable. It may be caused by one of the following reasons: " + "the broker's disk is full, write to logic queue error, write to index file error, etc" ); } return PutMessageStatus.SERVICE_NOT_AVAILABLE; } else { this .printTimes.set(0 ); } if (this .isOSPageCacheBusy()) { return PutMessageStatus.OS_PAGECACHE_BUSY; } return PutMessageStatus.PUT_OK; }
1.2 checkMessage检查消息 检查消息,看是否符合要求:
1、 如果topic长度大于127,则返回MESSAGE_ILLEGAL,表示topic过长了;2、 如果设置的属性长度大于32767,则返回MESSAGE_ILLEGAL,表示properties过长了;3、 否则,返回PUT_OK,表示检查通过;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private PutMessageStatus checkMessage (MessageExtBrokerInner msg) { if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return PutMessageStatus.MESSAGE_ILLEGAL; } if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return PutMessageStatus.MESSAGE_ILLEGAL; } return PutMessageStatus.PUT_OK; }
2 CommitLog#asyncPutMessage异步存储消息 该方法中将会对消息进行真正的保存,即持久化操作,步骤比较繁杂,但同样属于RocketMQ源码的精髓,值得一看 。其大概步骤为:
1、 处理延迟消息的逻辑;
1、 如果是延迟消息,即DelayTimeLevel大于0,那么替换topic为SCHEDULE_TOPIC_XXXX,替换queueId为延迟队列id,id=level-1,如果延迟级别大于最大级别,则设置为最大级别18,,默认延迟2h这些参数可以在broker端配置类MessageStoreConfig中配置;2、 最后保存真实topic到消息的REAL_TOPIC属性,保存queueId到消息的REAL_QID属性,方便后面恢复;2、 消息编码获取线程本地变量,其内部包含一个线程独立的encoder和keyBuilder对象将消息内容编码,存储到encoder内部的encoderBuffer中,它是通过ByteBuffer.allocateDirect( size)得到的一个直接缓冲区消息写入之后,会调用encoderBuffer.flip()方法,将Buffer从写模式切换到读模式,可以读取到数据;3、 加锁并写入消息;
1、 一个broker将所有的消息都追加到同一个逻辑CommitLog日志文件中,因此需要通过获取putMessageLock锁来控制并发有两种锁,一种是ReentrantLock可重入锁,另一种spin则是CAS锁根据StoreConfig的useReentrantLockWhenPutMessage决定是否使用可重入锁,默认为true,使用可重入锁;2、 从mappedFileQueue中的mappedFiles集合中获取最后一个MappedFile如果最新mappedFile为null,或者mappedFile满了,那么会新建mappedFile;3、 通过mappedFile调用appendMessage方法追加消息,这里仅仅是追加消息到byteBuffer的内存中如果是writeBuffer则表示消息写入了堆外内存中,如果是mappedByteBuffer,则表示消息写入了pagechache中总之,都是存储在内存之中;4、 追加成功之后解锁如果是剩余空间不足,则会重新初始化一个MappedFile并再次尝试追加;4、 如果存在写满的MappedFile并且启用了文件内存预热,那么这里调用unlockMappedFile对MappedFile执行解锁;5、 更新消息统计信息随后调用submitFlushRequest方法提交刷盘请求,将会根据刷盘策略进行刷盘随后调用submitReplicaRequest方法提交副本请求,用于主从同步;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 public CompletableFuture<PutMessageResult> asyncPutMessage (final MessageExtBrokerInner msg) { msg.setStoreTimestamp(System.currentTimeMillis()); msg.setBodyCRC(UtilAll.crc32(msg.getBody())); AppendMessageResult result = null ; StoreStatsService storeStatsService = this .defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { if (msg.getDelayTimeLevel() > 0 ) { if (msg.getDelayTimeLevel() > this .defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this .defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost(); if (bornSocketAddress.getAddress() instanceof Inet6Address) { msg.setBornHostV6Flag(); } InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost(); if (storeSocketAddress.getAddress() instanceof Inet6Address) { msg.setStoreHostAddressV6Flag(); } PutMessageThreadLocal putMessageThreadLocal = this .putMessageThreadLocal.get(); PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg); if (encodeResult != null ) { return CompletableFuture.completedFuture(encodeResult); } msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer); PutMessageContext putMessageContext = new PutMessageContext (generateKey(putMessageThreadLocal.getKeyBuilder(), msg)); long elapsedTimeInLock = 0 ; MappedFile unlockMappedFile = null ; putMessageLock.lock(); try { MappedFile mappedFile = this .mappedFileQueue.getLastMappedFile(); long beginLockTimestamp = this .defaultMessageStore.getSystemClock().now(); this .beginTimeInLock = beginLockTimestamp; msg.setStoreTimestamp(beginLockTimestamp); if (null == mappedFile || mappedFile.isFull()) { mappedFile = this .mappedFileQueue.getLastMappedFile(0 ); } if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); return CompletableFuture.completedFuture(new PutMessageResult (PutMessageStatus.CREATE_MAPEDFILE_FAILED, null )); } result = mappedFile.appendMessage(msg, this .appendMessageCallback, putMessageContext); switch (result.getStatus()) { case PUT_OK: break ; case END_OF_FILE: unlockMappedFile = mappedFile; mappedFile = this .mappedFileQueue.getLastMappedFile(0 ); if (null == mappedFile) { log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); return CompletableFuture.completedFuture(new PutMessageResult (PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); } result = mappedFile.appendMessage(msg, this .appendMessageCallback, putMessageContext); break ; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: return CompletableFuture.completedFuture(new PutMessageResult (PutMessageStatus.MESSAGE_ILLEGAL, result)); case UNKNOWN_ERROR: return CompletableFuture.completedFuture(new PutMessageResult (PutMessageStatus.UNKNOWN_ERROR, result)); default : return CompletableFuture.completedFuture(new PutMessageResult (PutMessageStatus.UNKNOWN_ERROR, result)); } elapsedTimeInLock = this .defaultMessageStore.getSystemClock().now() - beginLockTimestamp; } finally { beginTimeInLock = 0 ; putMessageLock.unlock(); } if (elapsedTimeInLock > 500 ) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}" , elapsedTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this .defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this .defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult (PutMessageStatus.PUT_OK, result); storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1 ); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes()); CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg); return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(flushStatus); } if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); } return putMessageResult; }); }
2.1 处理延迟消息 如果是延迟消息,即DelayTimeLevel大于0,那么替换topic为SCHEDULE_TOPIC_XXXX,替换queueId为延迟队列id, id = level - 1,如果延迟级别大于最大级别,则设置为最大级别18,,默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。
最后保存真实topic到消息的REAL_TOPIC属性,保存queueId到消息的REAL_QID属性,方便后面恢复。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { if (msg.getDelayTimeLevel() > 0 ) { if (msg.getDelayTimeLevel() > this .defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this .defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } }
2.2 获取最新mappedFile 首先从mappedFileQueue中的mappedFiles集合中获取最后一个MappedFile。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public MappedFile getLastMappedFile () { MappedFile mappedFileLast = null ; while (!this .mappedFiles.isEmpty()) { try { mappedFileLast = this .mappedFiles.get(this .mappedFiles.size() - 1 ); break ; } catch (IndexOutOfBoundsException e) { } catch (Exception e) { log.error("getLastMappedFile has exception." , e); break ; } } return mappedFileLast; }
如果最新mappedFile为null,或者mappedFile满了,那么会新建mappedFile。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public MappedFile getLastMappedFile (final long startOffset) { return getLastMappedFile(startOffset, true ); } public MappedFile getLastMappedFile (final long startOffset, boolean needCreate) { long createOffset = -1 ; MappedFile mappedFileLast = getLastMappedFile(); if (mappedFileLast == null ) { createOffset = startOffset - (startOffset % this .mappedFileSize); } if (mappedFileLast != null && mappedFileLast.isFull()) { createOffset = mappedFileLast.getFileFromOffset() + this .mappedFileSize; } if (createOffset != -1 && needCreate) { return tryCreateMappedFile(createOffset); } return mappedFileLast; }
2.1.1 tryCreateMappedFile创建新的MappedFile 该方法会获取下两个MappedFile的路径nextFilePath和nextNextFilePath,然后调用doCreateMappedFile真正的创建。也就是说一次请求创建两个MappedFile,对应两个commitlog。
为什么创建两个commitlog呢?这就是RocketMQ的一个优化,即commitlog文件预创建或者文件预分配,如果启用了MappedFile(MappedFile类可以看作是commitlog文件在Java中的抽象)预分配服务,那么在创建MappedFile时会同时创建两个MappedFile,一个同步创建并返回用于本次实际使用,一个后台异步创建用于下次取用。这样的好处是避免等到当前文件真正用完了才创建下一个文件,目的同样是提升性能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected MappedFile tryCreateMappedFile (long createOffset) { String nextFilePath = this .storePath + File.separator + UtilAll.offset2FileName(createOffset); String nextNextFilePath = this .storePath + File.separator + UtilAll.offset2FileName(createOffset + this .mappedFileSize); return doCreateMappedFile(nextFilePath, nextNextFilePath); }
doCreateMappedFile方法中,会判断如果allocateMappedFileService不为null,那么异步的创建MappedFile,否则,同步创建一个MappedFile。CommitLog的MappedFileQueue初始化时会初始化allocateMappedFileService,因此一般都不为null。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 protected MappedFile doCreateMappedFile (String nextFilePath, String nextNextFilePath) { MappedFile mappedFile = null ; if (this .allocateMappedFileService != null ) { mappedFile = this .allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this .mappedFileSize); } else { try { mappedFile = new MappedFile (nextFilePath, this .mappedFileSize); } catch (IOException e) { log.error("create mappedFile exception" , e); } } if (mappedFile != null ) { if (this .mappedFiles.isEmpty()) { mappedFile.setFirstCreateInQueue(true ); } this .mappedFiles.add(mappedFile); } return mappedFile; }
2.1.2 putRequestAndReturnMappedFile异步创建MappedFile MappedFile 作为一个RocketMQ的物理文件在Java中的映射类。实际上,commitLog consumerQueue、indexFile 3种文件磁盘的读写都是通过MappedFile操作的。它的构造器中会对当前文件进行mmap内存映射操作。
putRequestAndReturnMappedFile 方法用于创建MappedFile,会同时创建两个MappedFile,一个同步创建并返回用于本次实际使用,一个后台异步创建用于下次取用。这样的好处是避免等到当前文件真正用完了才创建下一个文件,目的同样是提升性能。
这里的同步和异步实际上都是通过一个服务线程执行的,该方法只是提交两个映射文件创建请求AllocateRequest,并且提交到requestTable和requestQueue中。随后当前线程只会同步等待第一个映射文件的创建,最多等待5s,如果创建成功则返回,而较大的offset那一个映射文件则会异步的创建,不会等待。
这里线程等待使用的是倒计数器CountDownLatch ,一个请求一个AllocateRequest对象,其内部还持有一个CountDownLatch对象,当该请求对应的MappedFile被创建完毕之后,会调用内部的CountDownLatch#countDown方法,自然会唤醒该等待的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 public MappedFile putRequestAndReturnMappedFile (String nextFilePath, String nextNextFilePath, int fileSize) { int canSubmitRequests = 2 ; if (this .messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { if (this .messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool() && BrokerRole.SLAVE != this .messageStore.getMessageStoreConfig().getBrokerRole()) { canSubmitRequests = this .messageStore.getTransientStorePool().availableBufferNums() - this .requestQueue.size(); } } AllocateRequest nextReq = new AllocateRequest (nextFilePath, fileSize); boolean nextPutOK = this .requestTable.putIfAbsent(nextFilePath, nextReq) == null ; if (nextPutOK) { if (canSubmitRequests <= 0 ) { log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " + "RequestQueueSize : {}, StorePoolSize: {}" , this .requestQueue.size(), this .messageStore.getTransientStorePool().availableBufferNums()); this .requestTable.remove(nextFilePath); return null ; } boolean offerOK = this .requestQueue.offer(nextReq); if (!offerOK) { log.warn("never expected here, add a request to preallocate queue failed" ); } canSubmitRequests--; } AllocateRequest nextNextReq = new AllocateRequest (nextNextFilePath, fileSize); boolean nextNextPutOK = this .requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null ; if (nextNextPutOK) { if (canSubmitRequests <= 0 ) { log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " + "RequestQueueSize : {}, StorePoolSize: {}" , this .requestQueue.size(), this .messageStore.getTransientStorePool().availableBufferNums()); this .requestTable.remove(nextNextFilePath); } else { boolean offerOK = this .requestQueue.offer(nextNextReq); if (!offerOK) { log.warn("never expected here, add a request to preallocate queue failed" ); } } } if (hasException) { log.warn(this .getServiceName() + " service has exception. so return null" ); return null ; } AllocateRequest result = this .requestTable.get(nextFilePath); try { if (result != null ) { boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS); if (!waitOK) { log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize()); return null ; } else { this .requestTable.remove(nextFilePath); return result.getMappedFile(); } } else { log.error("find preallocate mmap failed, this never happen" ); } } catch (InterruptedException e) { log.warn(this .getServiceName() + " service has exception. " , e); } return null ; }
2.1.3 AllocateMappedFileService创建MappedFile
*我们上面学习了putRequestAndReturnMappedFile方法,该方法仅仅是创建了两个AllocateRequest请求,并且提交到requestTable这个map集合中,但是并没有找到任何创建MappedFile的代码,那么MappedFile再哪里被创建的呢? **
实际上,AllocateMappedFileService 这个类继承了ServiceThread ,而ServiceThread实现了Runnable接口,那么我们能够知道AllocateMappedFileService实际上就是一个线程任务类,据此,我们很容易的联想到,具体的创建MappedFile的逻辑是通过这个线程任务来完成的。
在我们学习broker启动源码的时候,我们就见过了AllocateMappedFileService 这个类,broker启动时创建DefaultMessageStore 的时候,就会创建AllocateMappedFileService 实例,并且调用start方法进行启动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void start () { log.info("Try to start service thread:{} started:{} lastThread:{}" , getServiceName(), started.get(), thread); if (!started.compareAndSet(false , true )) { return ; } stopped = false ; this .thread = new Thread (this , getServiceName()); this .thread.setDaemon(isDaemon); this .thread.start(); }
启动线程任务之后,自然会运行run方法执行线程任务。
我们来看看AllocateMappedFileService的线程任务,内部是一个死循环,如果服务没有停止,并且没有被线程中断,那么一直循环执行mmapOperation方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void run () { log.info(this .getServiceName() + " service started" ); while (!this .isStopped() && this .mmapOperation()) { } log.info(this .getServiceName() + " service end" ); }
2.1.4 mmapOperation执行mmap操作 该方法用于创建MappedFile。大概步骤为:
1、 从requestQueue 中获取优先级最高的一个请求,即文件名最小或者说起始offset最小的请求requestQueue是一个优先级队列;2、 判断是否需要通过堆外内存 创建MappedFile,如果当前节点不是从节点,并且是异步刷盘策略,并且* *transientStorePoolEnable**参数配置为true,那么使用堆外内存,默认不使用;
1、 RocketMQ中引入的transientStorePoolEnable能缓解pagecache的压力,其原理是基于DirectByteBuffer和MappedByteBuffer 的读写分离 ;2、 消息先写入DirectByteBuffer (堆外内存),随后从MappedByteBuffer(pageCache)读取;3、 如果没有启动堆外内存,那么普通方式创建mappedFile,并且进行mmap操作;4、 如果mappedFile大小大于等于1G并且warmMapedFileEnable参数为true,那么预写mappedFile,也就是所谓的内存预热或者文件预热注意warmMapedFileEnable参数默认为false,即默认不开启文件预热,因此需要手动开启;5、 如果创建成功,那么将请求对象中的countDownLatch 释放计数,这样就可以唤醒在putRequestAndReturnMappedFile方法中被阻塞的线程了;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 private boolean mmapOperation () { boolean isSuccess = false ; AllocateRequest req = null ; try { req = this .requestQueue.take(); AllocateRequest expectedRequest = this .requestTable.get(req.getFilePath()); if (null == expectedRequest) { log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " " + req.getFileSize()); return true ; } if (expectedRequest != req) { log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " " + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest); return true ; } if (req.getMappedFile() == null ) { long beginTime = System.currentTimeMillis(); MappedFile mappedFile; if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { try { mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } catch (RuntimeException e) { log.warn("Use default implementation." ); mappedFile = new MappedFile (req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } } else { mappedFile = new MappedFile (req.getFilePath(), req.getFileSize()); } long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime); if (elapsedTime > 10 ) { int queueSize = this .requestQueue.size(); log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize + " " + req.getFilePath() + " " + req.getFileSize()); } if (mappedFile.getFileSize() >= this .messageStore.getMessageStoreConfig() .getMappedFileSizeCommitLog() && this .messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { mappedFile.warmMappedFile(this .messageStore.getMessageStoreConfig().getFlushDiskType(), this .messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile()); } req.setMappedFile(mappedFile); this .hasException = false ; isSuccess = true ; } } catch (InterruptedException e) { log.warn(this .getServiceName() + " interrupted, possibly by shutdown." ); this .hasException = true ; return false ; } catch (IOException e) { log.warn(this .getServiceName() + " service has exception. " , e); this .hasException = true ; if (null != req) { requestQueue.offer(req); try { Thread.sleep(1 ); } catch (InterruptedException ignored) { } } } finally { if (req != null && isSuccess) req.getCountDownLatch().countDown(); } return true ; }
2.1.4.1 采用mmap 普通创建MappedFile的方法即调用它的构造器。该构造器内部将会以给定的文件路径创建一个file,并且把commitlog文从磁盘空间件完全的映射到虚拟内存,也就是内存映射,即mmap,提升读写性能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public MappedFile (final String fileName, final int fileSize) throws IOException { init(fileName, fileSize); } private void init (final String fileName, final int fileSize) throws IOException { this .fileName = fileName; this .fileSize = fileSize; this .file = new File (fileName); this .fileFromOffset = Long.parseLong(this .file.getName()); boolean ok = false ; ensureDirOK(this .file.getParent()); try { this .fileChannel = new RandomAccessFile (this .file, "rw" ).getChannel(); this .mappedByteBuffer = this .fileChannel.map(MapMode.READ_WRITE, 0 , fileSize); TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet(); ok = true ; } catch (FileNotFoundException e) { log.error("Failed to create file " + this .fileName, e); throw e; } catch (IOException e) { log.error("Failed to map file " + this .fileName, e); throw e; } finally { if (!ok && this .fileChannel != null ) { this .fileChannel.close(); } } }
2.1.4.2 采用堆外内存 如果开启了堆外内存,那么将采用此方式创建MappedFile,其相比于mmap的方式,多了一步操作,即会设置一个writeBuffer。
1 2 3 4 5 6 7 8 9 10 public void init (final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { init(fileName, fileSize); this .writeBuffer = transientStorePool.borrowBuffer(); this .transientStorePool = transientStorePool; }
borrowBuffer 方法中,会返回TransientStorePool 内部的一个availableBuffer,如果启动堆外内存,那么在broker启动创建DefaultMessageStore的时候将会执行TransientStorePool#init 方法,该方法默认会初始化5个1G大小的堆外内存并且锁定住 。这是一个重量级初始化操作,将会延长broker启动时间。
堆外内存就是通过ByteBuffer.allocateDirect 方法分配的,这5块内存可以被重复利用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void init () { for (int i = 0 ; i < poolSize; i++) { ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize); final long address = ((DirectBuffer) byteBuffer).address(); Pointer pointer = new Pointer (address); LibC.INSTANCE.mlock(pointer, new NativeLong (fileSize)); availableBuffers.offer(byteBuffer); } }
如果是普通模式,将会采用mmap同时进行读写。如果采用读写分离,那么Broker将消息写入writeBuffer,即先写入DirectByteBuffer(堆外内存)就可以直接返回。然后异步转存服务CommitRealTimeService不断地从堆外内存中批量Commit到Page Cache中(将writeBuffer的数据写入到fileChannel中),而消费者始终从mappedByteBuffer(即page cache,mappedByteBuffer能获取到写入fileChannel的数据)中读取消息。后面我们学习到消息刷盘的时候会介绍源码。 *
高并发下写入 page cache 可能会造成刷脏页时磁盘压力较高,导致写入时出现毛刺现象。读写分离能缓解频繁写page cache 的压力,但会增加消息不一致的风险,使得数据一致性降低到最低。
2.1.5 warmMappedFile文件预热 mmap 操作减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销,这是它的好处。
但是mmap 操作对于OS来说只是建立虚拟内存地址至物理地址的映射关系,即将进程使用的虚拟内存地址映射到物理地址上。实际上并不会加载任何MappedFile数据至内存中,也并不会分配指定的大小的内存。当程序要访问数据时,如果发现这部分数据页并没有实际加载到内存中,则处理器自动触发一个缺页异常,进而进入内核空间再分配物理内存,一次分配大小默认4k。一个G大小的CommitLog,如果靠着缺页中断来分配实际内存,那么需要触发26w多次缺页中断,这是一笔不小的开销。
RocketMQ 避免频繁发生却也异常的做法是采用文件预热,即让操作系统提前分配物理内存空间,防止在写入消息时发生缺页异常才进行分配。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public void warmMappedFile (FlushDiskType type, int pages) { long beginTime = System.currentTimeMillis(); ByteBuffer byteBuffer = this .mappedByteBuffer.slice(); int flush = 0 ; long time = System.currentTimeMillis(); for (int i = 0 , j = 0 ; i < this .fileSize; i += MappedFile.OS_PAGE_SIZE, j++) { byteBuffer.put(i, (byte ) 0 ); if (type == FlushDiskType.SYNC_FLUSH) { if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) { flush = i; mappedByteBuffer.force(); } } if (j % 1000 == 0 ) { log.info("j={}, costTime={}" , j, System.currentTimeMillis() - time); time = System.currentTimeMillis(); try { Thread.sleep(0 ); } catch (InterruptedException e) { log.error("Interrupted" , e); } } } if (type == FlushDiskType.SYNC_FLUSH) { log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}" , this .getFileName(), System.currentTimeMillis() - beginTime); mappedByteBuffer.force(); } log.info("mapped file warm-up done. mappedFile={}, costTime={}" , this .getFileName(), System.currentTimeMillis() - beginTime); this .mlock(); }
RocketMQ对于MappedFile每隔OS_PAGE_SIZE大小写入一个0,即每4k写入一个0,来让操作系统预先分配1G大小的全额物理内存,通过这种预先分配内存的方式,可以避免在读写消息时引发引发异常而导致的性能损失。因为操作系统加载数据都是以Page Cache页为单位加载的,而一页大小就是4k,因此每隔4k写入一个0,就能保证分配4k大小的内存。 *
2.1.6 mlock锁定内存 **当实现了文件内存预热之后,虽然短时间不会读取数据不会引发缺页异常,但是当内存不足的时候,一部分不常使用的内存还是会被交换到swap空间中,当程序再次读取交换出去的数据的时候会再次产生缺页异常。 **
因此RocketMQ在warmMappedFile方法的最后还调用了mlock 方法,该方法调用系统mlock函数,锁定该文件的Page Cache,防止把预热过的文件被操作系统调到swap空间中。另外还会调用系统madvise函数,再次尝试一次性先将一段数据读入到映射内存区域,这样就减少了缺页异常的产生。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public void mlock () { final long beginTime = System.currentTimeMillis(); final long address = ((DirectBuffer) (this .mappedByteBuffer)).address(); Pointer pointer = new Pointer (address); { int ret = LibC.INSTANCE.mlock(pointer, new NativeLong (this .fileSize)); log.info("mlock {} {} {} ret = {} time consuming = {}" , address, this .fileName, this .fileSize, ret, System.currentTimeMillis() - beginTime); } { int ret = LibC.INSTANCE.madvise(pointer, new NativeLong (this .fileSize), LibC.MADV_WILLNEED); log.info("madvise {} {} {} ret = {} time consuming = {}" , address, this .fileName, this .fileSize, ret, System.currentTimeMillis() - beginTime); } }
2.3 appendMessage追加存储消息 当获取到mappedFile之后,将会调用mappedFile#appendMessage 方法追加消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public AppendMessageResult appendMessage (final MessageExtBrokerInner msg, final AppendMessageCallback cb, PutMessageContext putMessageContext) { return appendMessagesInner(msg, cb, putMessageContext); }
其内部调用另一个mappedFile#appendMessagesInner 方法真正的进行消息存储。
**该方法首先获取当前文件的写指针,如果写指针小于文件的大小,那么就对消息进行追加处理。追加处理的是通过回调函数的doAppend方法执行的,分为单条消息,和批量消息的处理。最后会更新写指针的位置,以及存储时间。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public AppendMessageResult appendMessagesInner (final MessageExt messageExt, final AppendMessageCallback cb, PutMessageContext putMessageContext) { assert messageExt != null ; assert cb != null ; int currentPos = this .wrotePosition.get(); if (currentPos < this .fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this .mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this .getFileFromOffset(), byteBuffer, this .fileSize - currentPos, (MessageExtBrokerInner) messageExt, putMessageContext); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this .getFileFromOffset(), byteBuffer, this .fileSize - currentPos, (MessageExtBatch) messageExt, putMessageContext); } else { return new AppendMessageResult (AppendMessageStatus.UNKNOWN_ERROR); } this .wrotePosition.addAndGet(result.getWroteBytes()); this .storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}" , currentPos, this .fileSize); return new AppendMessageResult (AppendMessageStatus.UNKNOWN_ERROR); }
2.3.1 doAppend执行追加 **真正的追加消息是通过AppendMessageCallback回调函数的doAppend方法执行的。这里回调函数的具体实现是DefaultAppendMessageCallback,它是位于CommitLog里面的一个内部类的实现。 **
源码很多,大概步骤为:
1、 获取消息物理偏移量,创建服务端消息Id生成器:4个字节IP+4个字节的端口号+8字节的消息偏移量从topicQueueTable中获取Queue队列的最大相对偏移量;2、 判断如果消息的长度加上文件结束符子节数大于maxBlank,则表示该commitlog剩余大小不足以存储该消息那么返回END_OF_FILE,在asyncPutMessage方法中判断到该code之后将会新建一个MappedFile并尝试再次存储;3、 如果空间足够,则将消息编码,并将编码后的消息写入到byteBuffer中,这里的byteBuffer可能是writeBuffer,即直接缓冲区,也有可能是普通缓冲区mappedByteBuffer;4、 返回AppendMessageResult对象,内部包括消息追加状态、消息写入物理偏移量、消息写入长度、消息ID生成器、消息开始追加的时间戳、消息队列偏移量、消息开始写入的时间戳等属性;
当该方法执行完毕,表示消息已被写入的byteBuffer中,如果是writeBuffer则表示消息写入了堆外内存中,如果是mappedByteBuffer,则表示消息写入了page chache中。总之,都是存储在内存之中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 public AppendMessageResult doAppend (final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) { long wroteOffset = fileFromOffset + byteBuffer.position(); Supplier<String> msgIdSupplier = () -> { int sysflag = msgInner.getSysFlag(); int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8 ; ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen); MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer); msgIdBuffer.clear(); msgIdBuffer.putLong(msgIdLen - 8 , wroteOffset); return UtilAll.bytes2string(msgIdBuffer.array()); }; String key = putMessageContext.getTopicQueueTableKey(); Long queueOffset = CommitLog.this .topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L ; CommitLog.this .topicQueueTable.put(key, queueOffset); } boolean multiDispatchWrapResult = CommitLog.this .multiDispatch.wrapMultiDispatch(msgInner); if (!multiDispatchWrapResult) { return new AppendMessageResult (AppendMessageStatus.UNKNOWN_ERROR); } final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L ; break ; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default : break ; } ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); final int msgLen = preEncodeBuffer.getInt(0 ); if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this .msgStoreItemMemory.clear(); this .msgStoreItemMemory.putInt(maxBlank); this .msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); final long beginTimeMills = CommitLog.this .defaultMessageStore.now(); byteBuffer.put(this .msgStoreItemMemory.array(), 0 , 8 ); return new AppendMessageResult (AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this .defaultMessageStore.now() - beginTimeMills); } int pos = 4 + 4 + 4 + 4 + 4 ; preEncodeBuffer.putLong(pos, queueOffset); pos += 8 ; preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position()); int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4 ; pos += 8 + 4 + 8 + ipLen; preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp()); final long beginTimeMills = CommitLog.this .defaultMessageStore.now(); byteBuffer.put(preEncodeBuffer); msgInner.setEncodedBuff(null ); AppendMessageResult result = new AppendMessageResult (AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this .defaultMessageStore.now() - beginTimeMills); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break ; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: CommitLog.this .topicQueueTable.put(key, ++queueOffset); CommitLog.this .multiDispatch.updateMultiQueueOffset(msgInner); break ; default : break ; } return result; }
2.3.2 消息序列化 Broker的commitlog只会存储序列化后的消息,其格式为:
字段
字段含义
字段大小
TOTALSIZE
消息条目总长度
4
MAGICCODE
魔数,用来判断消息是正常消息还是空消息
4
BODYCRC
消息体CRC校验码
4
QUEUEID
消息消费队列id
4
FLAG
消息flag
4
QUEUEOFFSET
消息在消息消费队列的偏移量
8
PHYSICALOFFSET
消息在commitlog中的偏移量
8
SYSFLAG
消息系统flag,例如是否压缩、是否是事务消息
4
BORNTIMESTAMP
消息生产者调用消息发送API的时间戳
8
BORNHOST
消息发送者的IP和端口号
8
STORETIMESTAMP
消息存储时间
8
STOREHOSTADDRESS
broker的IP和端口号
8
RECONSUMETIMES
消息重试次数
4
Prepared Transaction Offset
事务消息物理偏移量
8
bodyLength
消息体长度
4
body
消息体内容
bodyLength
topicLength
Topic名称内容大小
1
topicData
topic的值
topicLength
propertiesLength
消息属性大小
2
propertiesData
消息属性
propertiesLength
3 存储高性能设计总结 通过对于获取mappedFile部分的学习,我们知道RocketMQ对于commitlog的性能采用了多重优化措施:
1、 commitlog文件预创建或者文件预分配 :当获取新的commitlog的时候,如果磁盘满了或者没有文件,并且启用了MappedFile(MappedFile类可以看作是commitlog文件在Java中的抽象)预分配服务,那么在创建MappedFile时会同时创建两个MappedFile,一个同步创建并返回用于本次实际使用,一个后台异步创建用于下次取用这样的好处是避免等到当前文件真正用完了才创建下一个文件,提升性能;2、 mmap :这算作一种零拷贝技术,相比于传统的read和writemmap将一个文件(或者文件的一部分) 映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,简单的说,使用mmap之后,数据无需拷贝到用户空间中,应用程序可以直接操作PageCache中的数据,减少数据拷贝次数[Java两种zero-copy零拷贝技术mmap和sendfile的介绍][Java_zero-copy_mmap_sendfile];3、 文件预热或者内存预热 :mmap基于惰性加载策略,单独的mmap操作仅仅是建立映射而不是真实的分配固定大小的内存,只有访问到不存在的数据时才会引发缺页异常才会真正的加载数据RocketMQ对于新建的MappedFile每隔OS_PAGE_SIZE大小写入一个0,即每4k写入一个0,来让操作系统预先分配全额大小的物理内存,通过这种预先分配内存的方式,可以避免在读写消息时引发异常才分配内存而导致的性能损失;4、 内存锁定 :通过mlock系统调用,将预热后的内存空间锁定,防止因为内存不足导致pageCahce的数据被交换到swap空间中去,因为访问swap空间中的数据同样会引发缺页异常另外还会调用系统madvise函数,给操作系统建议,说这文件在不久的将来要访问的,建议操作系统尝试一次性先将一段数据读入到映射内存区域,这样就减少了缺页异常的产生;5、 读写分离 :如果启用了堆外内存,那么在创建新的MappedFile的时候,会分配一个writeBuffer,这段内存属于直接内存(堆外内存)Broker将消息写入writeBuffer,即先写入DirectByteBuffer(堆外内存),然后异步转存服务不断地从堆外内存中Commit到PageCache中(将writeBuffer的数据写入到fileChannel中),而消费者始终从mappedByteBuffer(即PageCache)(mappedByteBuffer能获取到写入fileChannel的数据)中读取消息读写分离能缓解pagecache的压力,但会增加消息不一致的风险;
以上就是RocketMQ高性能存储的一些关键设计,当然后面我们会了解到更多的高性能知识。
__END__