08、RocketMQ源码分析:Producer发送单向、同步、异步消息源码【一万字】
- 1 invokeOneway单向发送
-
- 2 sendMessageSync同步发送
-
- 2.1.1 invokeSyncImpl同步调用实现
-
- 2.1.2 processSendResponse处理响应结果
- 3 sendMessageAsync异步发送消息
-
- 3.1.1 invokeAsyncImpl异步调用实现
- 3.2 onExceptionImpl异常处理
- 4 NettyClientHandler处理服务端消息
- 4.1 processResponseCommand处理响应
- 4.1.1 executeInvokeCallback执行回调函数
- 4.1.1.1 executeInvokeCallback指定回调
- 4.1.2 putResponse存入响应
- 5 总结
此前,我们学习了RocketMQ源码(7)—Producer发送消息源码(1)—发送消息的总体流程【一万字】
,我们知道了Producer发送消息的总体流程,现在我们专门来学习一个重要的发送消息的内部方法MQClientAPIImpl#sendMessage的源码。
**当一切准备就绪,最终异步、单向、同步发送模式都会调用MQClientAPIImpl#sendMessage方法发送消息,该方法是真正的发起请求的方法。
**
这个方法的逻辑就比较简单了:
1、
首先构建发送消息命令对象RemotingCommand,此时会判断是否需要更换轻量级消息头,如果sendSmartMsg属性为true(默认为true)或者是批量消息,则使用轻量级消息头SendMessageRequestHeaderV2相比于requestHeader,其field全为a,b,c,d等短变量名,可以加快FastJson反序列化过程;
- 根据发送模式执行不同的发送逻辑。*
*单向发送模式调用RemotingClient#invokeOneway方法;异步发送模式调用MQClientAPIImpl#sendMessageAsync方法;同步发送模式调用MQClientAPIImpl#sendMessageSync方法。在异步和同步模式发送方法的调用前还会再检查是否超时,如果超时则不再调用。
**
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
public SendResult sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); RemotingCommand request = null; String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE); boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) { if (sendSmartMsg) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader); } } else { if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } } request.setBody(msg.getBody());
switch (communicationMode) { case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeAsync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); }
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); }
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); default: assert false; break; }
return null; }
|
1 invokeOneway单向发送
该模式下,消息只会发送一次,且不会返回任何结果,即只管发送不管结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
@Override public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { doBeforeRpcHooks(addr, request);
this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
|
首先会调用getAndCreateChannel方法,尝试获取此broker的通道,如果没有获取到,那么会创建一个通道,即连接。然后执行rpcHook的前置方法doBeforeRequest,随后调用另一个invokeOnewayImpl方法,发送单向消息。
1.1 invokeOnewayImpl单向调用
这个方法是单向消息的真正发送方法,终于到了最底层了,不容易啊。
该方法首先将请求标记为单向发送,然后基于**Semaphore信号量
***
尝试获取单向发送的资源,通过信号量控制单向消息并发发送的消息数,从而保护系统内存占用。客户端单向发送的Semaphore信号量默认为65535,即单向消息最大并发为65535,可通过配置”
com.rocketmq.remoting.clientOnewaySemaphoreValue”系统变量更改。*
获取到了信号量资源之后。构建SemaphoreReleaseOnlyOnce对象,保证信号量本次只被释放一次,防止并发操作引起线程安全问题,然后就通过channel发送请求即可。
在其监听器ChannelFutureListener中,会释放信号量,如果发送失败了,仅仅是打印一行warn日志,然后就不管了。如果没有获取到信号量资源,那么直接抛出异常即可,并且不再发送。
只管发送不管结果,不会进行任何重试,这就是单向发送消息的真正含义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
|
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { request.markOnewayRPC(); boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { once.release(); if (!f.isSuccess()) { log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); } } }); } catch (Exception e) { once.release(); log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed."); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); } else { String info = String.format( "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d", timeoutMillis, this.semaphoreOneway.getQueueLength(), this.semaphoreOneway.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
|
2 sendMessageSync同步发送
该模式下,发送之后会同步阻塞直到结果返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
private SendResult sendMessageSync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null;
return this.processSendResponse(brokerName, msg, response, addr); }
|
其内部调用NettyRemotingClient#invokeSync方法执行同步调用,然后调用processSendResponse方法处理响应结果。
2.1 invokeSync同步调用
该方法执行同步调用。首先获取或者创建通道,即连接。然后在发送消息前后执行rpcHook钩子方法,即RPCHook#doBeforeRequest
方法,通过调用invokeSyncImpl方法发起同步调用并获取响应结果返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
@Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout"); } RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
|
2.1.1 invokeSyncImpl同步调用实现
invokeSyncImpl方法发起同步调用并获取响应结果。
1、 首先创建一个ResponseFuture,然后将本次请求id和respone存入responseTable缓存;
2、 随后执行调用,并添加一个ChannelFutureListener,消息发送完毕会进行回调然后responseFuture通过waitResponse方法阻塞当前线程,直到得到响应结果或者到达超时时间;
3、 当ChannelFutureListener回调的时候会判断如果消息发送成功,那么设置发送成功并返回,否则设置发送失败标志和失败原因,并且设置响应结果为null,唤醒阻塞的responseFuture;
4、 responseFuture被唤醒后会进行一系列判断如果响应结果为null,那么会根据不同情况抛出不同的异常,如果响应结果不为null,那么返回响应结果;
5、 最后在finaly块中从responseTable中移除响应结果缓存;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque();
try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } });
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
|
2.1.1.1 请求的阻塞和唤醒
从上面的源码可以得知,同步发送消息的请求可能会经历短暂的阻塞状态。responseFuture通过waitResponse方法阻塞当前线程,直到得到响应结果或者到达超时时间。进入该方法,可以发现其阻塞使用的工具其实就是CountDownLatch。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; }
|
CountDownLatch
也被称为闭锁,它一般用来确保某些活动直到其他活动都完成才继续执行,ResponseFuture中的CountDownLatch的倒计数只有1。
1
| private final CountDownLatch countDownLatch = new CountDownLatch(1);
|
那么什么时候计数变为0呢,那就是调用putResponse方法的时候,该方法有两个调用点,一个是在ChannelFutureListener中判断请求发送失败的时候,直接设置一个null进去,另一个就是请求正常处理完毕的时候,在processResponseCommand方法中会将执行结果设置进去。
putResponse方法源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; this.countDownLatch.countDown(); }
|
putResponse方法中会调用countDownLatch#countDown方法,此时倒计数为0,此前阻塞的请求线程将会被唤醒。
2.1.2 processSendResponse处理响应结果
获得响应之后,调用processSendResponse方法处理响应结果,主要就是进行响应码的对应封装操作,然后对发送正常和异常情况分别进行不同的处理并返回sendResult对象。
从源码中可以看到,sendResult的构造器中,将会设置客户端生成的uniqMsgId为msgId属性,设置broker生成的MsgId为offsetMsgId属性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
|
private SendResult processSendResponse( final String brokerName, final Message msg, final RemotingCommand response, final String addr ) throws MQBrokerException, RemotingCommandException { SendStatus sendStatus; switch (response.getCode()) { case ResponseCode.FLUSH_DISK_TIMEOUT: { sendStatus = SendStatus.FLUSH_DISK_TIMEOUT; break; } case ResponseCode.FLUSH_SLAVE_TIMEOUT: { sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT; break; } case ResponseCode.SLAVE_NOT_AVAILABLE: { sendStatus = SendStatus.SLAVE_NOT_AVAILABLE; break; } case ResponseCode.SUCCESS: { sendStatus = SendStatus.SEND_OK; break; } default: { throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } } SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
String topic = msg.getTopic(); if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) { topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace()); } MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId()); String uniqMsgId = MessageClientIDSetter.getUniqID(msg); if (msg instanceof MessageBatch) { StringBuilder sb = new StringBuilder(); for (Message message : (MessageBatch) msg) { sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message)); } uniqMsgId = sb.toString(); } SendResult sendResult = new SendResult(sendStatus, uniqMsgId, responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); sendResult.setTransactionId(responseHeader.getTransactionId()); String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH); if (regionId == null || regionId.isEmpty()) { regionId = MixAll.DEFAULT_TRACE_REGION_ID; } if (traceOn != null && traceOn.equals("false")) { sendResult.setTraceOn(false); } else { sendResult.setTraceOn(true); } sendResult.setRegionId(regionId); return sendResult; }
|
3 sendMessageAsync异步发送消息
**该模式下,调用该方法之后不接收返回值,直接返回null,执行完毕之后会自动执行回调函数operationComplete。异步发送同样支持重试。
**
调用异步发送方法传递的SendCallback
对象并不会被直接调用,而是会被封装为另一个内部回调对象InvokeCallback,当异步请求获得响应结果,或者超时时间到了之后,会回调它的operationComplete方法。
该方法中会调用processSendResponse
方法解析响应结果为SendResult,如果是响应成功的状态,那么接着执行sendCallback的onSuccess方法,这里的sendCallback就是发送消息时传入的回调函数。
随后会调用updateFaultItem更新本地更新本地错误表缓存数据,用于延迟时间的故障转移的功能。
**如果抛出了异常,或者没有获取到broker的返回值,那么调用onExceptionImpl方法处理异常,该方法中会继续重试异步调用,这就是异步发送消息重试的逻辑。
**
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
private void sendMessageAsync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final AtomicInteger times, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws InterruptedException, RemotingException { final long beginStartTime = System.currentTimeMillis(); try {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override public void operationComplete(ResponseFuture responseFuture) { long cost = System.currentTimeMillis() - beginStartTime; RemotingCommand response = responseFuture.getResponseCommand(); if (null == sendCallback && response != null) {
try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); if (context != null && sendResult != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } } catch (Throwable e) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); return; } if (response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); assert sendResult != null; if (context != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); }
try { sendCallback.onSuccess(sendResult); } catch (Throwable e) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer); } } else { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); if (!responseFuture.isSendRequestOK()) { MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else if (responseFuture.isTimeout()) { MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause()); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else { MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } } } }); } catch (Exception ex) { long cost = System.currentTimeMillis() - beginStartTime; producer.updateFaultItem(brokerName, cost, true); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } }
|
3.1 invokeAsync异步调用
该方法执行异步调用。首先获取或者创建生产者与broker通道,即连接。然后在发送消息前后执行rpcHook钩子方法,即RPCHook#doBeforeRequest方法,最后通过调用invokeAsyncImpl方法发起异步调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
@Override public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { long beginStartTime = System.currentTimeMillis(); final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTooMuchRequestException("invokeAsync call the addr[" + addr + "] timeout"); } this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback); } catch (RemotingSendRequestException e) { log.warn("invokeAsync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
|
3.1.1 invokeAsyncImpl异步调用实现
invokeAsyncImpl方法发起异步调用。该方法和单向发送的方法一样,都会基于Semaphore信号量
尝试获取异步发送的资源,通过信号量控制异步消息并发发送的消息数,从而保护系统内存占用。客户端单向发送的Semaphore信号量默认为65535,即异步消息最大并发为65535,可通过配置”
com.rocketmq.remoting.clientAsyncSemaphoreValue”系统变量更改。
在获取到了信号量资源之后。构建SemaphoreReleaseOnlyOnce对象,保证信号量本次只被释放一次,防止并发操作引起线程安全问题,然后就通过channel发送请求即可。
然后创建一个ResponseFuture,设置超时时间、回调函数。然后将本次请求id和respone存入responseTable缓存。
随后执行调用,并添加一个ChannelFutureListener,消息发送完毕会进行回调。当ChannelFutureListener回调的时候会判断如果消息发送成功,那么设置发送成功并返回,否则如果发送失败了,则移除缓存、设置false、并且执行InvokeCallback#operationComplete回调。
如果发送成功了,那么InvokeCallback#operationComplete回调会执行吗,当让会了,当请求正常处理完毕的时候,在processResponseCommand方法中会将执行InvokeCallback#operationComplete回调。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
|
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { long beginStartTime = System.currentTimeMillis(); final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { once.release(); throw new RemotingTimeoutException("invokeAsyncImpl call timeout"); } final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; }
requestFail(opaque); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
|
3.2 onExceptionImpl异常处理
异步调用如果发生了异常,例如broker返回了错入的响应,或者没有获得响应,那会会执行onExceptionImpl这个异常处理方法。
可以看到,所谓的重试实际上就是重复的调用sendMessageAsync方法。重试之前,首先会判断本次重试的次数是否大于重试总次数,参数为retryTimesWhenSendFailed,默认2次。如果超过了最大重试次数,那么便不会重试,而是执行
sendCallback#onException方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
private void onExceptionImpl(final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int timesTotal, final AtomicInteger curTimes, final Exception e, final SendMessageContext context, final boolean needRetry, final DefaultMQProducerImpl producer ) { int tmp = curTimes.incrementAndGet();
if (needRetry && tmp <= timesTotal) { String retryBrokerName = brokerName; if (topicPublishInfo != null) { MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName); retryBrokerName = mqChosen.getBrokerName(); } String addr = instance.findBrokerAddressInPublish(retryBrokerName); log.warn(String.format("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, retryBrokerName), e); try { request.setOpaque(RemotingCommand.createNewRequestId()); sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer); } catch (InterruptedException e1) { onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingTooMuchRequestException e1) { onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingException e1) { producer.updateFaultItem(brokerName, 3000, true); onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } } else { if (context != null) { context.setException(e); context.getProducer().executeSendMessageHookAfter(context); }
try { sendCallback.onException(e); } catch (Exception ignored) { } } }
|
4 NettyClientHandler处理服务端消息
RocketMQ在启动的时候,在MQClientInstance的start方法中,会创建一个netty客户端,然后会添加一个处理器NettyClientHandler。
这个NettyClientHandler用于处理RemotingCommand消息,即来处理自服务端的请求消息,或者客户端发出去的请求消息后,服务的返回来的响应消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } }
|
NettyClientHandler通过processMessageReceived方法处理RemotingCommand消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
|
这里我们主要看processResponseCommand方法,即客户端发送消息之后服务端的响应会被processResponseCommand方法处理。
4.1 processResponseCommand处理响应
客户端发送消息之后服务端的响应会被processResponseCommand方法处理。消息发送请求的响应处理也是该方法完成的。其大概流程为:
1、 先根据请求id找到之前放到responseTable的ResponseFuture,然后从responseTable中移除ResponseFuture缓存;
2、 判断如果存在回调函数,即异步请求,那么调用executeInvokeCallback方法,该方法会执行回调函数的方法;
3、 如果没有回调函数,则调用putResponse方法该方法将响应数据设置到responseCommand,然后调用countDownLatch.countDown,即倒计数减去1,唤醒等待的线程;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }
|
4.1.1 executeInvokeCallback执行回调函数
该方法主要是在异步请求的时候被调用,例如之前的异步发送消息,之前我们知道,异步请求发送的时候,会同时指定一个回调函数。而当前获得来自服务端的响应之后,就会调用了该回调函数。
该方法尝试在回调执行器中执行回调操作,如果回调执行器为null,则在当前线程中执行回调。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
private void executeInvokeCallback(final ResponseFuture responseFuture) { boolean runInThisThread = false; ExecutorService executor = this.getCallbackExecutor(); if (executor != null) { try { executor.submit(new Runnable() { @Override public void run() { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("execute callback in executor exception, and callback throw", e); } finally { responseFuture.release(); } } }); } catch (Exception e) { runInThisThread = true; log.warn("execute callback in executor exception, maybe executor busy", e); } } else { runInThisThread = true; } if (runInThisThread) { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("executeInvokeCallback Exception", e); } finally { responseFuture.release(); } } }
|
4.1.1.1 executeInvokeCallback指定回调
该方法将会调用invokeCallback.operationComplete回调方法,这个方法我们之前在异步发送请求的时候就讲过了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
public void executeInvokeCallback() { if (invokeCallback != null) { if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) { invokeCallback.operationComplete(this); } } }
|
4.1.2 putResponse存入响应
该方法主要是在同步请求的时候被调用,例如之前的同步发送消息,之前我们知道,同步请求发送之后,请求线程会阻塞。而当前获得来自服务端的响应之后,就会调用了该方法。
该方法将响应数据设置到responseCommand,然后调用countDownLatch.countDown,即倒计数减去1,唤醒等待的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; this.countDownLatch.countDown(); }
|
5 总结
本次我们讲解了重要的发送消息的内部方法MQClientAPIImpl#sendMessage的源码,该方法内部又会根据发送模式执行不同的发送逻辑。
*
*单向发送模式调用RemotingClient#invokeOneway方法;异步发送模式调用MQClientAPIImpl#sendMessageAsync方法;同步发送模式调用MQClientAPIImpl#sendMessageSync方法。在异步和同步模式发送方法的调用前还会再检查是否超时,如果超时则不再调用。
**
同步发送模式内部采用CountDownLatch工具实现线程的阻塞和唤醒
,当发送了同步消息之后,当前线程阻塞,当服务端响应返回之后,将会通过CountDownLatch减少倒计数来唤醒阻塞的线程。发送请求和响应怎么对应上的呢?发送请求的时候会生成并带上本次请求的请求Id,客户端返回响应中带有对应的请求的请求Id,这样就能对应上了。
同步发送和异步发送模式都会有消息重试
,消息发送过程中如果抛出了RemotingException、MQClientException、以及部分MQBrokerException异常时,那么会进行重试,默认重试2次
。如果抛出了InterruptedException,或者因为超时则不再重试。
__END__