09、RocketMQ源码分析:Broker接收消息入口源码
*此前我们学习了RocketMQ的Producer发送消息的源码,首先会查找查找topic的发布信息,然后找到一个消息队列MessageQueue,默认是轮询的选择,MessageQueue中存储着对应的brokerName,通过brokerName就能找到具体的brokerIP,随后获取producer客户端与这台broker的channel,随后就可以向这台broker发送消息了,注意消息只会被发送到主Broker中,即Master节点。 **
以上就是RocketMQ的Producer发送消息的大概路程,下面我们来学习RocketMQ的Broker接收消息、处理请求的源码。
文章目录
1 Broker处理请求入口
1.1 registerProcessor注册消息处理器
1.2 NettyServerHandler处理请求
1.3 processRequestCommand分发处理请求
1.3.1 rejectRequest是否拒绝请求
1.3.1.1 isOSPageCacheBusy操作系统页缓存是否繁忙
1.3.1.2 isTransientStorePoolDeficient检查临时存储池是否不足
2 asyncProcessRequest异步处理请求
2.1 asyncProcessRequest异步处理请求
2.1.1 parseRequestHeader解析请求头
3 总结
1 Broker处理请求入口 1.1 registerProcessor注册消息处理器 RocketMQ的各种组件的网络通信都是基于Netty实现的。我们在此前学习RocketMQ的Broker的启动源码的时候,会发现Broker在启动的时候在BrokerController#registerProcessor方法中会注册很多的netty消息处理器,不同的消息处理器可以处理不同的消息类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void registerProcessor () { SendMessageProcessor sendProcessor = new SendMessageProcessor (this ); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this .remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this .sendMessageExecutor); this .remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this .sendMessageExecutor); this .remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this .sendMessageExecutor); this .remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this .sendMessageExecutor); this .fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this .sendMessageExecutor); this .fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this .sendMessageExecutor); this .fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this .sendMessageExecutor); this .fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this .sendMessageExecutor); }
其中,SendMessageProcessor 这个处理器被用来专门处理发送消息请求,也就是说Producer的发送消息类请求都是通过这个处理器来处理的。这些处理器会连同对应的执行器线程池一起构建一个Pair对象,然后以requestCode为key,Pair对象为value注册到processorTable集合缓存中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public void registerProcessor (int requestCode, NettyRequestProcessor processor, ExecutorService executor) { ExecutorService executorThis = executor; if (null == executor) { executorThis = this .publicExecutor; } Pair<NettyRequestProcessor, ExecutorService> pair = new Pair <NettyRequestProcessor, ExecutorService>(processor, executorThis); this .processorTable.put(requestCode, pair); }
1.2 NettyServerHandler处理请求 当Netty服务端接收到消息的时候,首先会在NettyServerHandler中进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 @ChannelHandler .Sharableclass NettyServerHandler extends SimpleChannelInboundHandler <RemotingCommand> { @Override protected void channelRead0 (ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } }
具体的处理器方法就是processMessageReceived 方法了,在这个方法中,会根据请求code将请求分发给不同的处理器进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public void processMessageReceived (ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null ) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break ; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break ; default : break ; } } }
这个方法我们在学习Producer发送消息源码的时候就见过了,它会根据接收到的消息是请求还是响应选择不同的方法处理,如果是请求消息则调用processRequestCommand方法,如果是响应消息则调用processResponseCommand方法。在这里,是Broker端接收Producer的请求,因此会调用processRequestCommand方法处理,而如果是Producer端接收到了Broker的响应,则是调用processResponseCommand方法处理。
1.3 processRequestCommand分发处理请求 该方法是服务端用来处理来自客户端的请求指令的入口方法,大概流程为:
1、 首先从请求中获取requestCode,然后根据此code从processorTable这个本地缓存变量中找到对应的processor以及对应的处理线程池如果该Code没有注册的RequestProcessor,则采用DefaultRequestProcessor作为请求处理器;2、 然后会创建一个线程任务Runnable,该线程任务中:;
1、 首先会回去远程地址,然后执行前置钩子方法;2、 然后创建一个响应回调函数RemotingResponseCallback,在获得响应之后会执行该函数该函数中会先执行后置钩子方法,然后判断如果响应response不为null,则写响应;3、 然后会判断执行器是不是异步执行器,如果是的话那么直接调用执行器的asyncProcessRequest方法处理请求以及执行回调函数否则直接调用processRequest方法,然后同步等待获取response,最后调用回调函数的callback方法;3、 判断如果该请求处理器拒绝该请求,那么返回系统繁忙的响应SYSTEM_BUSY:[REJECTREQUEST] systembusy,startflowcontrolforawhile;4、 根据此前创建的Runnable创建请求任务RequesTask对象,随后通过对应的请求执行器线程池执行这个任务,这里就是支持多线程并发的执行请求处理的逻辑,也是RocketMQRPC通信模型中的M2;
模型中的M2;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 public void processRequestCommand (final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pair<NettyRequestProcessor, ExecutorService> matched = this .processorTable.get(cmd.getCode()); final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this .defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null ) { Runnable run = new Runnable () { @Override public void run () { try { String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); doBeforeRpcHooks(remoteAddr, cmd); final RemotingResponseCallback callback = new RemotingResponseCallback () { @Override public void callback (RemotingCommand response) { doAfterRpcHooks(remoteAddr, cmd, response); if (!cmd.isOnewayRPC()) { if (response != null ) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed" , e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } }; if (pair.getObject1() instanceof AsyncNettyRequestProcessor) { AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor) pair.getObject1(); processor.asyncProcessRequest(ctx, cmd, callback); } else { NettyRequestProcessor processor = pair.getObject1(); RemotingCommand response = processor.processRequest(ctx, cmd); callback.callback(response); } } catch (Throwable e) { log.error("process request exception" , e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while" ); response.setOpaque(opaque); ctx.writeAndFlush(response); return ; } try { final RequestTask requestTask = new RequestTask (run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000 ) == 0 ) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while" ); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported" ; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } }
1.3.1 rejectRequest是否拒绝请求 在调用执行器处理请求之前,会调用处理器的rejectRequest方法,判断该处理器能否处理该请求。
不同的处理器对于rejectRequest方法有不同的实现,如果是SendMessageProcessor,那么它的实现为:检查操作系统页缓存PageCache是否繁忙或者检查临时存储池transientStorePool是否不足,如果其中有一个不满足要求,则拒绝处理该请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public boolean rejectRequest () { return this .brokerController.getMessageStore().isOSPageCacheBusy() || this .brokerController.getMessageStore().isTransientStorePoolDeficient(); }
1.3.1.1 isOSPageCacheBusy操作系统页缓存是否繁忙 一个broker将所有的消息都追加到同一个逻辑CommitLog日志文件中,因此需要通过获取putMessageLock锁来控制并发。
Diff表示锁的持有时间,当前时间减去获取锁开始时间,这个时间可以看作是处理上一个消息目前所花费的时间。
如果broker持有锁的时间超过osPageCacheBusyTimeOutMills ,则算作操作系统页缓存繁忙,那么会拒绝处理当前请求,直观现象就是客户端抛出**[REJECTREQUEST]system busy, start flow control for a while异常。 osPageCacheBusyTimeOutMills可以配置, 默认为1000ms,即1s**。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public boolean isOSPageCacheBusy () { long begin = this .getCommitLog().getBeginTimeInLock(); long diff = this .systemClock.now() - begin; return diff < 10000000 && diff > this .messageStoreConfig.getOsPageCacheBusyTimeOutMills(); }
1.3.1.2 isTransientStorePoolDeficient检查临时存储池是否不足 如果启用commitLog临时存储池,那么检查当前可用的buffers堆外内存的数量是否不足。RocketMQ中引入的 transientStorePoolEnable 能缓解 pagecache 的压力,其原理是基于DirectByteBuffer和MappedByteBuffer的读写分离,消息先写入DirectByteBuffer(堆外内存),随后从MappedByteBuffer(pageCache)读取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public boolean isTransientStorePoolDeficient () { return remainTransientStoreBufferNumbs() == 0 ; } public int remainTransientStoreBufferNumbs () { return this .transientStorePool.availableBufferNums(); }
仅当transientStorePoolEnable 为true(默认false)且FlushDiskType为ASYNC_FLUSH 且当前broker不是SLAVE角色时,才启用commitLog临时存储池。如果没开启commitLog临时存储池,那么返回最大int值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public int availableBufferNums () { if (storeConfig.isTransientStorePoolEnable()) { return availableBuffers.size(); } return Integer.MAX_VALUE; } public boolean isTransientStorePoolEnable () { return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType() && BrokerRole.SLAVE != getBrokerRole(); }
2 asyncProcessRequest异步处理请求 生产者发送消息的请求,将会被Broker的SendMessageProcessor处理器处理,并且被SendMessageProcessor执行器并发执行。
SendMessageProcessor属于AsyncNettyRequestProcessor,因此会调用asyncProcessRequest方法执行请求和响应回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 @Override public void asyncProcessRequest (ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this .brokerController.getPutMessageFutureExecutor()); }
该方法调用另一个asyncProcessRequest 方法异步处理请求,然后调用thenAcceptAsync 方法异步的执行回调。
2.1 asyncProcessRequest异步处理请求 asyncProcessRequest方法根据不同的RequestCode异步处理请求。如果RequestCode是CONSUMER_SEND_MSG_BACK,即消费者发送的消息回退请求,那么调用asyncConsumerSendMsgBack方法处理,其他情况下走默认处理逻辑。 默认处理逻辑中,首先解析请求头,然后构建发送请求消息轨迹上下文,随后执行发送消息前钩子方法,最后判断如果是批量消息请求,那么调用asyncSendBatchMessage方法执行处理批量发送消息逻辑,否则调用asyncSendMessage方法处理其他发送消息逻辑,例如单条消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public CompletableFuture<RemotingCommand> asyncProcessRequest (ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this .asyncConsumerSendMsgBack(ctx, request); default : SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null ) { return CompletableFuture.completedFuture(null ); } mqtraceContext = buildMsgContext(ctx, requestHeader); this .executeSendMessageHookBefore(ctx, request, mqtraceContext); if (requestHeader.isBatch()) { return this .asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { return this .asyncSendMessage(ctx, request, mqtraceContext, requestHeader); } } }
该方法会解析请求头为SendMessageRequestHeader对象,在该方法中会通过不同的RequestCode选择不同的解析方法,如果是批量消息或者轻量(压缩)消息,那么先解析为SendMessageRequestHeaderV2,然后转换为SendMessageRequestHeader,否则直接解析为SendMessageRequestHeader。
我们之前讲过,在Producer发送消息的时候可能会使用轻量级消息头SendMessageRequestHeaderV2,SendMessageRequestHeaderV2相比于SendMessageRequestHeader,其field 全为 a,b,c,d 等短变量名,可以加快FastJson反序列化过程,提升传输效率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 protected SendMessageRequestHeader parseRequestHeader (RemotingCommand request) throws RemotingCommandException { SendMessageRequestHeaderV2 requestHeaderV2 = null ; SendMessageRequestHeader requestHeader = null ; switch (request.getCode()) { case RequestCode.SEND_BATCH_MESSAGE: case RequestCode.SEND_MESSAGE_V2: requestHeaderV2 = decodeSendMessageHeaderV2(request); case RequestCode.SEND_MESSAGE: if (null == requestHeaderV2) { requestHeader = (SendMessageRequestHeader) request .decodeCommandCustomHeader(SendMessageRequestHeader.class); } else { requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2); } default : break ; } return requestHeader; }
3 总结 本次我们学习了broker接收消息源码入口的处理逻辑,可以看到最终是通过调用asyncSendMessage 方法来处理来自producer发送的消息的。asyncSendMessage 方法内部要做的事情就多了,包括存储消息,构建索引,分发消费等等,这部分我们下回专门学习。
__END__