07、RocketMQ源码分析:Producer发送消息的总体流程【一万字】
1 send源码入口
1.2 单向消息
1.3 异步消息
2 sendDefaultImpl发送消息实现
2.1 makeSureStateOK确定生产者服务状态
2.2 checkMessage校验消息的合法性
2.3 tryToFindTopicPublishInfo查找topic的发布信息
2.4 计算发送次数timesTotal
2.5 selectOneMessageQueue选择消息队列
2.5.1 selectOneMessageQueue选择一个mq
2.6 sendKernelImpl发送消息
2.6.1 findBrokerAddressInPublish查找broker地址
2.6.2 brokerVIPChannel判断vip通道
2.6.3 setUniqID生成uniqId
2.6.4 tryToCompressMessage压缩消息
2.7 updateFaultItem更新故障表
2.7.1 computeNotAvailableDuration计算隔离时间
2.7.2 updateFaultItem更新故障表
3 总结
下面是一个最简单的producer的使用案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class Producer { public static void main (String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer ("please_rename_unique_group_name" ); producer.setNamesrvAddr("127.0.0.1:9876" ); producer.start(); try { Message msg = new Message ("Topic1" , "TagA" , ("Hello RocketMQ " ).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n" , sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000 ); } } }
可以看到producer通过调用send方法发送消息,实际上RocketMQ的producer发送消息的模式可以分为三种:
1、 单向发送 :把消息发向Broker服务器,而不用管消息是否成功发送到Broker服务器,只管发送,不管结果;2、 同步发送 :把消息发送给Broker服务器,如果消息成功发送给Broker服务器,能得到Broker服务器的响应结果;3、 异步发送 :把消息发送给Broker服务器,如果消息成功发送给Broker服务器,能得到Broker服务器的响应结果因为是异步发送,发送完消息以后,不用等待,等到Broker服务器的响应调用回调;
DefaultMQProducer提供了更多的send的重载方法,来实现上面三种发送模式:
模式
方法
描述
同步
SendResult send(Collection msgs)
同步批量发送消息
SendResult send(Collection msgs, long timeout)
同步批量发送消息
SendResult send(Collection msgs, MessageQueue messageQueue)
向指定的消息队列同步批量发送消息
SendResult send(Collection msgs, MessageQueue messageQueue, long timeout)
向指定的消息队列同步批量发送消息,并指定超时时间
SendResult send(Message msg)
同步单条发送消息
SendResult send(Message msg, long timeout)
同步发送单条消息,并指定超时时间
SendResult send(Message msg, MessageQueue mq)
向指定的消息队列同步发送单条消息
SendResult send(Message msg, MessageQueue mq, long timeout)
向指定的消息队列同步单条发送消息,并指定超时时间
SendResult send(Message msg, MessageQueueSelector selector, Object arg)
向消息队列同步单条发送消息,并指定发送队列选择器
SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
向消息队列同步单条发送消息,并指定发送队列选择器与超时时间
异步
void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
向指定的消息队列异步单条发送消息
void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
向指定的消息队列异步单条发送消息,并指定超时时间
void send(Message msg, SendCallback sendCallback)
异步发送消息
void send(Message msg, SendCallback sendCallback, long timeout)
异步发送消息,并指定回调方法和超时时间
void send(Message msg, MessageQueue mq, SendCallback sendCallback)
向指定的消息队列异步单条发送消息,并指定回调方法
void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
向指定的消息队列异步单条发送消息,并指定回调方法和超时时间
单向
void sendOneway(Message msg)
单向发送消息,不等待broker响应
void sendOneway(Message msg, MessageQueue mq)
单向发送消息到指定队列,不等待broker响应
void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
单向发送消息到队列选择器的选中的队列,不等待broker响应
上次我们分析了producer的启动流程源码,这次我们分析producer发送消息的源码。
1 send源码入口 DefaultMQProducer#send方法作为源码分析的入口方法,该方法被使用者直接调用。其内部调用defaultMQProducerImpl#send方法发送消息。
1.1 同步消息 1 2 3 4 5 6 7 8 9 public SendResult send ( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); return this .defaultMQProducerImpl.send(msg); }
该方法内部调用defaultMQProducerImpl#send发送消息。
1 2 3 4 5 6 7 public SendResult send ( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, this .defaultMQProducer.getSendMsgTimeout()); }
该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。
1 2 3 4 5 6 7 8 9 10 11 12 13 public SendResult send (Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this .sendDefaultImpl(msg, CommunicationMode.SYNC, null , timeout); }
该方法内部又调用另一个sendDefaultImpl方法,设置消息发送模式为SYNC,即同步;设置回调函数为null。
1.2 单向消息 单向消息使用sendOneway发送。
1 2 3 4 5 6 7 8 9 public void sendOneway (Message msg) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); this .defaultMQProducerImpl.sendOneway(msg); }
该方法内部调用defaultMQProducerImpl#sendOneway。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void sendOneway (Message msg) throws MQClientException, RemotingException, InterruptedException { try { this .sendDefaultImpl(msg, CommunicationMode.ONEWAY, null , this .defaultMQProducer.getSendMsgTimeout()); } catch (MQBrokerException e) { throw new MQClientException ("unknown exception" , e); } }
最终调用sendDefaultImpl方法,设置消息发送模式为ONEWAY,即单向;设置回调函数为null;设置超时时间参数,默认3000ms。
1.3 异步消息 异步消息使用带有callback函数的send方法发送。
1 2 3 4 5 6 7 8 9 public void send (Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); this .defaultMQProducerImpl.send(msg, sendCallback); }
该方法内部调用defaultMQProducerImpl#send方法发送消息,带有sendCallback参数。
1 2 3 4 5 6 7 public void send (Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { send(msg, sendCallback, this .defaultMQProducer.getSendMsgTimeout()); }
该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public void send (final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { final long beginStartTime = System.currentTimeMillis(); ExecutorService executor = this .getAsyncSenderExecutor(); try { executor.submit(new Runnable () { @Override public void run () { long costTime = System.currentTimeMillis() - beginStartTime; if (timeout > costTime) { try { sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime); } catch (Exception e) { sendCallback.onException(e); } } else { sendCallback.onException( new RemotingTooMuchRequestException ("DEFAULT ASYNC send call timeout" )); } } }); } catch (RejectedExecutionException e) { throw new MQClientException ("executor rejected " , e); } }
该方法内部会获取获取异步发送执行器线程池,使用线程池异步的执行sendDefaultImpl方法,即异步发送消息。
发送之前计算超时时间,如果超时则不发送,直接执行回调函数onException方法。
2 sendDefaultImpl发送消息实现 该方法位于DefaultMQProducerImpl中,无论是同步消息、异步消息还是单向消息,最终都是调用该方法实现发送消息的逻辑的,因此该方法是真正的发送消息的方法入口。
该方法的大概步骤为:
1、 调用makeSureStateOK方法,确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常;2、 调用checkMessage方法,校验消息的合法性;3、 调用tryToFindTopicPublishInfo方法,尝试查找消息的一个topic路由,用以发送消息;4、 计算循环发送消息的总次数timesTotal,默认情况下,同步模式为3,即默认允许重试2次,可更改重试次数;其他模式为1,即不允许重试,不可更改实际上异步发送消息也会重试,最多两次,只不过不是通过这里的逻辑重试的;5、 调用selectOneMessageQueue方法,选择一个消息队列MessageQueue,该犯法支持失败故障转移;6、 调用sendKernelImpl方法发送消息,异步、同步、单向发送消息的模式都是通过该方法实现的;7、 调用updateFaultItem方法,更新本地错误表缓存数据,用于延迟时间的故障转移的功能;8、 根据发送模式执行不同的处理,如果是异步或者单向模式则直接返回,如果是同步模式,如果开启了retryAnotherBrokerWhenNotStoreOK开关,那么如果返回值不是返回SEND_OK状态,则仍然会执行重试发送;9、 此过程中,如果抛出了RemotingException、MQClientException、以及部分MQBrokerException异常时,那么会进行重试,如果抛出了InterruptedException,或者因为超时则不再重试;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 private SendResult sendDefaultImpl ( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this .makeSureStateOK(); Validators.checkMessage(msg, this .defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; TopicPublishInfo topicPublishInfo = this .tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false ; MessageQueue mq = null ; Exception exception = null ; SendResult sendResult = null ; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this .defaultMQProducer.getRetryTimesWhenSendFailed() : 1 ; int times = 0 ; String[] brokersSent = new String [timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this .selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null ) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0 ) { msg.setTopic(this .defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true ; break ; } sendResult = this .sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this .updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false ); switch (communicationMode) { case ASYNC: return null ; case ONEWAY: return null ; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this .defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue ; } } return sendResult; default : break ; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this .updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true ); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s" , invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue ; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this .updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true ); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s" , invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue ; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this .updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true ); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s" , invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; if (this .defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) { continue ; } else { if (sendResult != null ) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this .updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false ); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s" , invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception" , e); log.warn(msg.toString()); throw e; } } else { break ; } } if (sendResult != null ) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s" , times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException (info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException ("sendDefaultImpl call timeout" ); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } validateNameServerSetting(); throw new MQClientException ("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null ).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
2.1 makeSureStateOK确定生产者服务状态 首先会确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void makeSureStateOK () throws MQClientException { if (this .serviceState != ServiceState.RUNNING) { throw new MQClientException ("The producer service state not OK, " + this .serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null ); } }
2.2 checkMessage校验消息的合法性 确定服务状态正常之后,还需要校验消息的合法性。校验规则为:
1、 如果msg消息为null,抛出异常;2、 校验topic如果topic为空,或者长度大于127个字符,或者topic的字符串不符合”^[%|a-zA-Z0-9_-]+$” 模式,即包含非法字符,那么抛出异常如果当前topic是不为允许使用的系统topic,那么抛出异常;3、 校验消息体如果消息体为null,或者为空数组,或者消息字节数组长度大于4,194,304,即消息的大小大于4M,那么抛出异常;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public static void checkMessage (Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException (ResponseCode.MESSAGE_ILLEGAL, "the message is null" ); } Validators.checkTopic(msg.getTopic()); Validators.isNotAllowedSendTopic(msg.getTopic()); if (null == msg.getBody()) { throw new MQClientException (ResponseCode.MESSAGE_ILLEGAL, "the message body is null" ); } if (0 == msg.getBody().length) { throw new MQClientException (ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero" ); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException (ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } } public static void checkTopic (String topic) throws MQClientException { if (UtilAll.isBlank(topic)) { throw new MQClientException ("The specified topic is blank" , null ); } if (topic.length() > TOPIC_MAX_LENGTH) { throw new MQClientException ( String.format("The specified topic is longer than topic max length %d." , TOPIC_MAX_LENGTH), null ); } if (isTopicOrGroupIllegal(topic)) { throw new MQClientException (String.format( "The specified topic[%s] contains illegal characters, allowing only %s" , topic, "^[%|a-zA-Z0-9_-]+$" ), null ); } }
2.3 tryToFindTopicPublishInfo查找topic的发布信息 该方法用于查找指定topic的发布信息TopicPublishInfo。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private TopicPublishInfo tryToFindTopicPublishInfo (final String topic) { TopicPublishInfo topicPublishInfo = this .topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this .topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo ()); this .mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this .topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this .mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true , this .defaultMQProducer); topicPublishInfo = this .topicPublishInfoTable.get(topic); return topicPublishInfo; } }
首先在本地缓存topicPublishInfoTable获取,如果没有获取到有效数据,那么立即调用updateTopicRouteInfoFromNameServer方法从nameServer同步此topic的路由配置信息,并且更新本地缓存,如果还是没有获取到有效数据,那么再次从nameServer同步topic的数据,不过这次使用默认的topic “TBW102”去找路由配置信息作为本topic参数信息。
updateTopicRouteInfoFromNameServer 方法我们在此前的producer启动流程中已经介绍了。 TopicPublishInfo包含topic的各种属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private boolean orderTopic = false ;private boolean haveTopicRouterInfo = false ;private List<MessageQueue> messageQueueList = new ArrayList <MessageQueue>();private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex ();private TopicRouteData topicRouteData;
2.4 计算发送次数timesTotal 在发送消息之前,会先计算最大发送次数,同步模式为3,即默认允许重试2次,可更改重试次数;其他模式为1,即不允许重试,不可更改。
1 2 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this .defaultMQProducer.getRetryTimesWhenSendFailed() : 1 ;
**注意,异步发送同样有重试,并且也是两次,只不过它的重试不在这个循环里面,而是是在MQClientAPIImpl#sendMessage方法中,后面会讲到。 **
2.5 selectOneMessageQueue选择消息队列 selectOneMessageQueue 方法用于查找一个可用的消息队列,该方法内部调用mqFaultStrategy#selectOneMessageQueue 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 public MessageQueue selectOneMessageQueue (final TopicPublishInfo tpInfo, final String lastBrokerName) { return this .mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); }
mqFaultStrategy#selectOneMessageQueue 方法支持故障转移机制,其选择步骤为:
1、 首先判断是否开启了发送延迟故障转移机制,即sendLatencyFaultEnable属性是否为true,默认false不打开如果开启了该机制:;
1、 首先仍然是遍历消息队列,按照轮询的方式选取一个消息队列,当消息队列可用(无故障)时,选择消息队列的工作就结束,否则循环选择其他队列如果该mq的broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中,或者当前时间戳已经大于该broker下一次开始可用的时间戳,表示无故障;2、 没有选出无故障的mq,那么从LatencyFaultTolerance维护的不是最好的broker集合faultItemTable中随机选择一个broker,随后判断如果写队列数大于0,那么选择该broker然后遍历消息队列,采用取模的方式获取一个队列,即轮询的方式,重置其brokerName,queueId,进行消息发送;3、 如果上面的步骤抛出了异常,那么遍历消息队列,采用取模的方式获取一个队列,即轮询的方式;2、 如果没有发送延迟故障转移机制,那么那么遍历消息队列,即采用取模轮询的方式获取一个brokerName与lastBrokerName不相等的队列,即不会再次选择上次发送失败的broker如果没有找到一个不同broker的mq,那么退回到轮询的方式;
selectOneMessageQueue 方法选择mq的时候的故障转移机制,其目的就是为了保证每次发送消息尽量更快的成功,是一种保证高可用的手段。总的来说,包括两种故障转移:
1、 一种是延迟时间的故障转移,这需要将sendLatencyFaultEnable属性中设置为true,默认false对于请求响应较慢的broker,可以在一段时间内将其状态置为不可用,消息队列选择时,会过滤掉mq认为不可用的broker,以此来避免不断向宕机的broker发送消息,选取一个延迟较短的broker,实现消息发送高可用;2、 另一种是没有开启延迟时间的故障转移的时候,在轮询选择mq的时候,不会选择上次发送失败的broker,实现消息发送高可用;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public MessageQueue selectOneMessageQueue (final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this .sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0 ; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0 ) pos = 0 ; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0 ) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null ) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue" , e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
2.5.1 selectOneMessageQueue选择一个mq selectOneMessageQueue方法有两个重载方法,一个是有参数的,另一个是无参数的。 无参数的方法,即轮询选择一个mq,没有任何限制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public MessageQueue selectOneMessageQueue () { int index = this .sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this .messageQueueList.size(); if (pos < 0 ) pos = 0 ; return this .messageQueueList.get(pos); }
有参数的方法,其参数是上一次发送失败的brokerName,并且在选择的时候,不会选择上一次发送失败的brokerName的mq,即避免选择发送失败的broker继续发送。当然如果最后没有选出来,那么还是走轮询获取的逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public MessageQueue selectOneMessageQueue (final String lastBrokerName) { if (lastBrokerName == null ) { return selectOneMessageQueue(); } else { for (int i = 0 ; i < this .messageQueueList.size(); i++) { int index = this .sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this .messageQueueList.size(); if (pos < 0 ) pos = 0 ; MessageQueue mq = this .messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
2.6 sendKernelImpl发送消息 选择了消息队列之后,会调用sendKernelImpl方法进行消息的发送。该方法的大概步骤为:
1、 首先调用findBrokerAddressInPublish方法从brokerAddrTable中查找Masterbroker地址如果找不到,那么再次调用tryToFindTopicPublishInfo方法从nameServer远程拉取配置,并更新本地缓存,随后再次尝试获取Masterbroker地址;2、 调用brokerVIPChannel判断是否开启vip通道,如果开启了,那么将brokerAddr的port–2,因为vip通道的端口为普通端口–2;3、 如果不是批量消息,那么设置唯一的uniqId;4、 如果不是批量消息,并且消息体大于4K,那么进行消息压缩;5、 如果存在CheckForbiddenHook,则执行checkForbidden钩子方法如果存在SendMessageHook,则执行sendMessageBefore钩子方法;6、 设置请求头信息SendMessageRequestHeader,请求头包含各种基本属性,例如producerGroup、topic、queueId等,并且针对重试消息的处理,将消息重试次数和最大重试次数存入请求头中;
根据不同的发送模式发送消息。如果是异步发送模式,则需要先克隆并还原消息。最终异步、单向、同步模式都是调用MQClientAPIImpl#sendMessage方法发送消息的。
如果MQClientAPIImpl#sendMessage方法正常发送或者抛出RemotingException、MQBrokerException、InterruptedException异常,那么会判断如果存在SendMessageHook,则执行sendMessageAfter钩子方法。
9、 在finally块中,对原始消息进行恢复;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 private SendResult sendKernelImpl (final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); String brokerAddr = this .mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this .mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null ; if (brokerAddr != null ) { brokerAddr = MixAll.brokerVIPChannel(this .defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte [] prevBody = msg.getBody(); try { if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false ; if (null != this .mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this .mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true ; } int sysFlag = 0 ; boolean msgBodyCompressed = false ; if (this .tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true ; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext (); checkForbiddenContext.setNameSrvAddr(this .defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this .defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this .isUnitMode()); this .executeCheckForbiddenHook(checkForbiddenContext); } if (this .hasSendMessageHook()) { context = new SendMessageContext (); context.setProducer(this ); context.setProducerGroup(this .defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this .defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this .defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true" )) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME" ) != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null ) { context.setMsgType(MessageType.Delay_Msg); } this .executeSendMessageHookBefore(context); } SendMessageRequestHeader requestHeader = new SendMessageRequestHeader (); requestHeader.setProducerGroup(this .defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this .defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this .defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0 ); requestHeader.setUnitMode(this .isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null ) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null ) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null ; switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false ; if (msgBodyCompressed) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true ; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true ; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this .defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException ("sendKernelImpl call timeout" ); } sendResult = this .mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this .mQClientFactory, this .defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this ); break ; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException ("sendKernelImpl call timeout" ); } sendResult = this .mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this ); break ; default : assert false ; break ; } if (this .hasSendMessageHook()) { context.setSendResult(sendResult); this .executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException e) { if (this .hasSendMessageHook()) { context.setException(e); this .executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this .hasSendMessageHook()) { context.setException(e); this .executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this .hasSendMessageHook()) { context.setException(e); this .executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this .defaultMQProducer.getNamespace())); } } throw new MQClientException ("The broker[" + mq.getBrokerName() + "] not exist" , null ); }
2.6.1 findBrokerAddressInPublish查找broker地址 首先会根据brokerName从brokerAddrTable中查找broker地址。生产者只会向Master节点发送消息,因此只会返回Master节点的地址。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public String findBrokerAddressInPublish (final String brokerName) { HashMap<Long, String> map = this .brokerAddrTable.get(brokerName); if (map != null && !map.isEmpty()) { return map.get(MixAll.MASTER_ID); } return null ; }
2.6.2 brokerVIPChannel判断vip通道 **获取到brokerAddr之后,需要判断是否开启vip通道,如果开启了,那么将brokerAddr的port – 2,因为vip通道的端口为普通通道端口– 2。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static String brokerVIPChannel (final boolean isChange, final String brokerAddr) { if (isChange) { int split = brokerAddr.lastIndexOf(":" ); String ip = brokerAddr.substring(0 , split); String port = brokerAddr.substring(split + 1 ); String brokerAddrNew = ip + ":" + (Integer.parseInt(port) - 2 ); return brokerAddrNew; } else { return brokerAddr; } }
消费者拉取消息只能请求普通通道,但是生产者发送消息可以选择vip通道或者普通通道。
为什么要开启两个端口监听客户端请求呢 ?答案是隔离读写操作。在消息的API中,最重要的是发送消息,需要高RTT。如果普通端口的请求繁忙,会使得netty的IO线程阻塞,例如消息堆积的时候,消费消息的请求会填满IO线程池,导致写操作被阻塞。在这种情况下,我们可以向VIP频道发送消息,以保证发送消息的RTT。
但是,请注意,在rocketmq 4.5.1版本之后,客户端发送消息的请求选择VIP通道的配置被改为false,想要手动默认开启需要配置com.rocketmq.sendMessageWithVIPChannel属性。或者在创建producer的时候调用producer.setVipChannelEnabled() 方法更改当前producer的配置。
因此,现在发送消息和消费消息实际上默认都走10911端口了,无需再关心10909端口的问题了。
2.6.3 setUniqID生成uniqId 该方法用于设置单条消息在客户端的uniqId,即设置到UNIQ_KEY属性中,批量消息在生成时就已经设置uniqId。
uniqId也被称为msgId,从逻辑上代表客户端生成的唯一一条消息,更多见此文章 ,uniqId生成规则 。
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void setUniqID (final Message msg) { if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null ) { msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID()); } }
2.6.4 tryToCompressMessage压缩消息 在发送单条消息的时候,会判断如果消息体超过4K,那么会进行消息压缩,压缩比默认为5,压缩完毕之后设置压缩标志,批量消息不支持压缩。消息压缩有利于更快的进行网络数据传输。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 private boolean tryToCompressMessage (final Message msg) { if (msg instanceof MessageBatch) { return false ; } byte [] body = msg.getBody(); if (body != null ) { if (body.length >= this .defaultMQProducer.getCompressMsgBodyOverHowmuch()) { try { byte [] data = UtilAll.compress(body, zipCompressLevel); if (data != null ) { msg.setBody(data); return true ; } } catch (IOException e) { log.error("tryToCompressMessage exception" , e); log.warn(msg.toString()); } } } return false ; }
2.7 updateFaultItem更新故障表 再发送消息完毕之后,无论是正常还是异常状态,都需要调用updateFaultItem方法,更新本地错误表缓存数据,用于延迟时间的故障转移的功能。
故障转移功能在此前的selectOneMessageQueue方法中被使用到,用于查找一个可用的消息队列。updateFaultItem方法在判断开启了故障转移之后,会更新LatencyFaultTolerance维护的faultItemTable集合属性中的异常broker数据。
1 2 3 4 5 6 7 8 9 10 11 12 public void updateFaultItem (final String brokerName, final long currentLatency, boolean isolation) { this .mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); }
看看MQFaultStrategy#updateFaultItem 方法。其根据本次发送消息的延迟时间currentLatency,会去计算出该broker的隔离时间duration,即可以计算出该broker的下一个可用时间点。然后更新故障记录表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void updateFaultItem (final String brokerName, final long currentLatency, boolean isolation) { if (this .sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this .latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } }
2.7.1 computeNotAvailableDuration计算隔离时间 computeNotAvailableDuration方法根据本次发送消息的延迟时间currentLatency,会去计算出该broker的隔离时间duration,或者说不可以用时间段,据此即可以计算出该broker的下一个可用时间点。
latencyMax延迟等级和notAvailableDuration隔离时间的对应关系如下:
latencyMax,Producer发送消息消耗时长
notAvailableDuration,Broker不可用时长
50L
0L
100L
0L
550L
30000L
1000L
60000L
2000L
120000L
3000L
180000L
15000L
600000L
**如果使用默认隔离时间30000,那个实际将会被隔离600000L,即10分钟。当抛出异常的时候,通常会设置isolation,即使用默认隔离时间。并且从这个表可以看出来,发送消息延迟越大,那么被设置的隔离时间也就越大。 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private long [] latencyMax = { 50L , 100L , 550L , 1000L , 2000L , 3000L , 15000L }; private long [] notAvailableDuration = { 0L , 0L , 30000L , 60000L , 120000L , 180000L , 600000L }; private long computeNotAvailableDuration (final long currentLatency) { for (int i = latencyMax.length - 1 ; i >= 0 ; i--) { if (currentLatency >= latencyMax[i]) return this .notAvailableDuration[i]; } return 0 ; }
2.7.2 updateFaultItem更新故障表 该方法更新LatencyFaultToleranceImpl维护的faultItemTable集合属性中的异常broker的故障信息,将会设置发送消息的延迟时间currentLatency属性,以及下一个可用时间点LatencyFaultToleranceImpl属性。
下次可用时间LatencyFaultToleranceImpl属性= 现在的时间 + 隔离的时间,在selectOneMessageQueue方法选取消息队列的时候,如果开启了集群故障转移,那么会查找下一个可用时间点小于当前时间点的broker的队列来发送消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Override public void updateFaultItem (final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this .faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem (name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this .faultItemTable.putIfAbsent(name, faultItem); if (old != null ) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
3 总结 本次我们学习了Producer的发送消息的源码总体流程,对于具体的发送消息的sendMessage方法源码将在下文讲解。
从这些源码中,我们得知了一些常见的却容易混淆的概念和知识,例如:
1、 生产者消息重试:RocketMQ的消费者消息重试和生产者消息重投;2、 生产者故障转移通过sendLatencyFaultEnable属性配置是否开启,默认未开启故障转移机制,其目的就是为了保证每次发送消息尽量更快的成功,是一种保证高可用的手段总的来说,包括两种故障转移:;
1、 一种是延迟时间的故障转移,这需要将sendLatencyFaultEnable属性中设置为true,默认false对于请求响应较慢的broker,可以在一段时间内将其状态置为不可用,消息队列选择时,会过滤掉mq认为不可用的broker,以此来避免不断向宕机的broker发送消息,选取一个延迟较短的broker,实现消息发送高可用;2、 另一种是没有开启延迟时间的故障转移的时候,在轮询选择mq的时候,不会选择上次发送失败的broker,实现消息发送高可用;3、 Vip通道VIP通道用于隔离读写操作消费者拉取消息只能请求普通通道,但是生产者发送消息可以选择vip通道或者普通通道;
1、 在消息的API中,最重要的是发送消息,需要高RTT如果普通端口的请求繁忙,会使得netty的IO线程阻塞,例如消息堆积的时候,消费消息的请求会填满IO线程池,导致写操作被阻塞在这种情况下,我们可以向VIP频道发送消息,以保证发送消息的RTT;2、 但是,请注意,在rocketmq4.5.1版本之后,客户端发送消息的请求选择VIP通道的配置被改为false,想要手动默认开启需要配置com.rocketmq.sendMessageWithVIPChannel属性或者在创建producer的时候调用producer.setVipChannelEnabled() 方法更改当前producer的配置;4、 故障转移表,RocketMQ的Producer生产者故障转移依赖于故障转移表实现,他是一个HasmMap消息发送结束之后,会根据本次发送消息的延迟时间currentLatency,会去计算出该broker对应的的隔离时间duration,即可以计算出该broker的下一个可用时间点,然后更新故障记录表故障转移表的key为brokerName,value为未来该broker可用时间;
__END__