05、RocketMQ源码分析:Broker与NameServer的心跳服务源码
05、RocketMQ源码分析:Broker与NameServer的心跳服务源码
分类:RocketMQ源码分析(1) 2024-03-27 阅读(142)
详细介绍了RocketMQ的Broker与NameServer的心跳服务源码,主要包括三部分:
1、 Broker发送心跳注册请求源码;
2、 NameServer处理心跳注册请求源码;
3、 NameServer的心跳检测服务源码;
文章目录
- 1 Broker发送心跳注册请求
-
- 1.2 registerBrokerAll注册broker信息
- 1.2.1 needRegister是否需要注册
-
- 1.2.2 doRegisterBrokerAll注册broker信息
- 1.2.2.1 registerBroker注册broker
- 2 NameServer处理心跳注册请求
-
- 2.2 Nameserver注册broker信息
- 2.3.1 RouteInfoManager的介绍
- 2.3.2 registerBroker注册broker
- 3 NameServer的心跳检测服务
- 3.1 scanNotActiveBroker扫描清除不活跃broker
- 3.1.1 onChannelDestroy清除路由信息
- 4 总结
1 Broker发送心跳注册请求
Broker启动过程中,会跟所有的NameServer建立并保持长连接,然后开启定时任务定时发送心跳包,心跳包中包含当前Broker信息,包括地址、名字、id等等,以及存储的所有Topic的信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
1.1 发送心跳包入口
具体的入口就是BrokerController#start方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
public void start() throws Exception { if (this.messageStore != null) { this.messageStore.start(); } if (this.remotingServer != null) { this.remotingServer.start(); } if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); } if (this.fileWatchService != null) { this.fileWatchService.start(); } if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); } if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); } if (this.clientHousekeepingService != null) { this.clientHousekeepingService.start(); } if (this.filterServerManager != null) { this.filterServerManager.start(); } if (!messageStoreConfig.isEnableDLegerCommitLog()) { startProcessorByHa(messageStoreConfig.getBrokerRole()); handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true); } this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); } if (this.brokerFastFailure != null) { this.brokerFastFailure.start(); } }
|
在start方法中,可以看到在最后启动了一个定时任务,默认情况下每隔30s调用registerBrokerAll方法向所有的nameServer进行一次注册broker信息,时间间隔可以配置registerNameServerPeriod属性,允许的值是在1万到6万毫秒之间。这个定时任务就是Broker向nameserver发送的心跳包的定时任务,包括topic名、读、写队列个数、队列权限、是否有序等信息。
在这个定时任务之前,实际上还会调用一次registerBrokerAll方法,在broker首次启动时强制进行Broker注册。

1.2 registerBrokerAll注册broker信息
registerBrokerAll方法用于当前Broker将自身信息注册到所有的NameServer中。
内部调用的doRegisterBrokerAll方法执行注册,调用该方法之前,会判断是否需要注册,如果如果forceRegister为true,表示强制注册,或者如果当前broker应该注册,那么向nameServer进行注册。
在start方法中调用的registerBrokerAll方法,其forceRegister参数都为true,表示一定会强制注册的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
|
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission()); topicConfigTable.put(topicConfig.getTopicName(), tmp); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); }
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } }
|
1.2.1 needRegister是否需要注册
该方法用于判断当前broker是否需要向nameserver进行注册,当forceRegister参数为true的时候,表示强制注册,那么该方法的结果是无所谓的,如果forceRegister为false,那么borker是否需要向nameserver注册就得看这个方法的结果了。
其内部调用brokerOuterAPI#needRegister方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
private boolean needRegister(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final int timeoutMills) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills); boolean needRegister = false; for (Boolean changed : changeList) { if (changed) { needRegister = true; break; } } return needRegister; }
|
needRegister方法的逻辑也很简单,就是向所有nameServer发起请求(请求code为QUERY_DATA_VERSION,322),获取所有nameserver的DataVersion数据,然后一一对比自身的DataVersion数据是否一致,如果有一个nameserver的数据版本不一致则重新注册。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
public List<Boolean> needRegister( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final TopicConfigSerializeWrapper topicConfigWrapper, final int timeoutMills) { final List<Boolean> changedList = new CopyOnWriteArrayList<>(); List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try {
QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader); request.setBody(topicConfigWrapper.getDataVersion().encode()); RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills); DataVersion nameServerDataVersion = null; Boolean changed = false; switch (response.getCode()) { case ResponseCode.SUCCESS: { QueryDataVersionResponseHeader queryDataVersionResponseHeader = (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class); changed = queryDataVersionResponseHeader.getChanged(); byte[] body = response.getBody(); if (body != null) { nameServerDataVersion = DataVersion.decode(body, DataVersion.class); if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) { changed = true; } } if (changed == null || changed) { changedList.add(Boolean.TRUE); } } default: break; } log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion); } catch (Exception e) { changedList.add(Boolean.TRUE); log.error("Query data version from name server {} Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } });
} try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("query dataversion from nameserver countDownLatch await Exception", e); } } return changedList; }
|
1.2.1.1 DataVersion介绍
DataVersion是RocketMQ的数据版本控制机制。其结构比较简单,核心属性方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
private long timestamp = System.currentTimeMillis();
private AtomicLong counter = new AtomicLong(0);
public void assignNewOne(final DataVersion dataVersion) { this.timestamp = dataVersion.timestamp; this.counter.set(dataVersion.counter.get()); }
public void nextVersion() { this.timestamp = System.currentTimeMillis(); this.counter.incrementAndGet(); }
|
他的nextVersion方法被调用时,将会引起timestamp和counter的改变,一般来说,当新创建broker,或者更新topic的信息的时候nextVersion方法会被调用。
Dataversion和topic的配置都被持久化到topics.json文件中,其格式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
| { "dataVersion":{ "counter":3, "timestamp":1651398321850 }, "topicConfigTable":{ "SCHEDULE_TOPIC_XXXX":{ "order":false, "perm":6, "readQueueNums":18, "topicFilterType":"SINGLE_TAG", "topicName":"SCHEDULE_TOPIC_XXXX", "topicSysFlag":0, "writeQueueNums":18 }, "TopicTest":{ "order":false, "perm":6, "readQueueNums":4, "topicFilterType":"SINGLE_TAG", "topicName":"TopicTest", "topicSysFlag":0, "writeQueueNums":4 }, "SELF_TEST_TOPIC":{ "order":false, "perm":6, "readQueueNums":1, "topicFilterType":"SINGLE_TAG", "topicName":"SELF_TEST_TOPIC", "topicSysFlag":0, "writeQueueNums":1 }, "DefaultCluster":{ "order":false, "perm":7, "readQueueNums":16, "topicFilterType":"SINGLE_TAG", "topicName":"DefaultCluster", "topicSysFlag":0, "writeQueueNums":16 }, "DefaultCluster_REPLY_TOPIC":{ "order":false, "perm":6, "readQueueNums":1, "topicFilterType":"SINGLE_TAG", "topicName":"DefaultCluster_REPLY_TOPIC", "topicSysFlag":0, "writeQueueNums":1 }, "RMQ_SYS_TRANS_HALF_TOPIC":{ "order":false, "perm":6, "readQueueNums":1, "topicFilterType":"SINGLE_TAG", "topicName":"RMQ_SYS_TRANS_HALF_TOPIC", "topicSysFlag":0, "writeQueueNums":1 }, "broker-a":{ "order":false, "perm":7, "readQueueNums":1, "topicFilterType":"SINGLE_TAG", "topicName":"broker-a", "topicSysFlag":0, "writeQueueNums":1 }, "TBW102":{ "order":false, "perm":7, "readQueueNums":8, "topicFilterType":"SINGLE_TAG", "topicName":"TBW102", "topicSysFlag":0, "writeQueueNums":8 }, "BenchmarkTest":{ "order":false, "perm":6, "readQueueNums":1024, "topicFilterType":"SINGLE_TAG", "topicName":"BenchmarkTest", "topicSysFlag":0, "writeQueueNums":1024 }, "OFFSET_MOVED_EVENT":{ "order":false, "perm":6, "readQueueNums":1, "topicFilterType":"SINGLE_TAG", "topicName":"OFFSET_MOVED_EVENT", "topicSysFlag":0, "writeQueueNums":1 }, "%RETRY%please_rename_unique_group_name_4":{ "order":false, "perm":6, "readQueueNums":1, "topicFilterType":"SINGLE_TAG", "topicName":"%RETRY%please_rename_unique_group_name_4", "topicSysFlag":0, "writeQueueNums":1 } } }
|
Dataversion和topic的配置被加载到内存之后,分别会解析成为topicConfigManager的topicConfigTablehe属性和dataVersion属性。
1.2.2 doRegisterBrokerAll注册broker信息
doRegisterBrokerAll方法的逻辑也很简单,就是向所有nameServer发起请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) {
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, this.filterServerManager.buildNewFilterServerList(), oneway, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isCompressedRegister());
if (registerBrokerResultList.size() > 0) { RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); }
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) { this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } } }
|
内部调用BrokerOuterAPI#registerBrokerAll方法,BrokerOuterAPI类专门提供了broker向外部发起请求的api方法。
因为broker要向所有的nameServer进行注册,为了提升性能,registerBrokerAll方法里面使用了多线程机制,使用brokerOuterExecutor线程池并行的发起对于每个nameserver的注册请求。
有了多线程提升效率,自然就需要保证线程安全和控制并发:
1、
因为需要在多线程中将执行结果并行存入集合中,RocketMQ使用了CopyOnWriteArrayList
这个并发集合来保证线程安全CopyOnWriteArrayList采用COW(CopyOnWrite)机制,即写是复制,读数据时完全没有控制,即不会加锁写数据时加独占锁,并且会复制出一个新的List,在新的List中写入数据,写完了之后使用新的List替换旧的List;
2、
虽然对于注册的请求使用了线程池异步执行,但是主线程却需要等待这些请求都执行完毕,所有的结果才能继续向下执行,对于这种并发控制,RocketMQ使用了CountDownLatch
倒计数器,它能够使得主线程阻塞,确保在其他线程任务执行完毕之后,才会唤醒主线程继续执行后续逻辑;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>(); List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body); if (result != null) { registerBrokerResultList.add(result); }
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally {
countDownLatch.countDown(); } } }); }
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } }
return registerBrokerResultList; }
|
1.2.2.1 registerBroker注册broker
在上面的多线程代码中,线程任务是调用另一个registerBroker方法,该方法真正的执行向一个nameserver发起注册的请求。
broker注册请求为同步请求,code为REGISTER_BROKER,103,注册的信息主要包括自身的所有topic数据、dataVersion、filterServerList、以及包括集群名、broker地址、broker名、brokerId等等在内的一些broker自身的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
private RegisterBrokerResult registerBroker( final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); request.setBody(body);
if (oneway) { try { this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { } return null; }
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); RegisterBrokerResult result = new RegisterBrokerResult(); result.setMasterAddr(responseHeader.getMasterAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr()); if (response.getBody() != null) { result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); } return result; } default: break; }
throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr()); }
|
2 NameServer处理心跳注册请求
Broker发送了心跳包之后,nameserver会进行专门的处理,保存或者更新broker上报的心跳包数据。
2.1 处理心跳包入口
NameServer的默认网络处理器是DefaultRequestProcessor,因此心跳请求的入口也就是DefaultRequestProcessor#processRequest方法。
processRequest方法是一个通用的请求处理入口方法,内部会根据请求的不同requestCode进入分发处理,心跳请求的requestCode就是REGISTER_BROKER,103。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
|
可以看到nameserver会调用registerBrokerWithFilterServer方法来处理大于3.0.11版本的broker的注册请求。
我们进入registerBrokerWithFilterServer方法,可以看到,该方法会解析请求头、请求体中的内容,内容包括topic信息、版本信息dataVersion、消息过滤信息filterServerList、以及broker基本信息,例如broker地址、名字等等。最后会调用routeInfoManager#registerBroker方法实现broker信息的注册。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
|
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); if (!checksum(ctx, request, requestHeader)) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("crc32 not match"); return response; }
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) { try { registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed()); } catch (Exception e) { throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e); } } else { registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0)); registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0); }
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), registerBrokerBody.getTopicConfigSerializeWrapper(), registerBrokerBody.getFilterServerList(), ctx.channel());
responseHeader.setHaServerAddr(result.getHaServerAddr()); responseHeader.setMasterAddr(result.getMasterAddr()); byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
|
2.2 Nameserver注册broker信息
2.3.1 RouteInfoManager的介绍
Nameserver通过routeInfoManager#registerBroker方法实现broker信息的注册。我们先来看看RouteInfoManager的基本属性,这个类位于namesrv模块中,用来管理nameServer上的关于真个RocketMQ集群的各种路由信息,nameServer作为轻量级的注册中心,RouteInfoManager这个类非常的重要。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String, List<QueueData>> topicQueueTable;
private final HashMap<String, BrokerData> brokerAddrTable;
private final HashMap<String, Set<String>> clusterAddrTable;
private final HashMap<String, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String, List<String>> filterServerTable;
|
2.3.2 registerBroker注册broker
该方法用于注册broker,也就是对broker的各种路由信息进行更新或者注册。
其主要步骤为:
1、 加写锁,保证线程安全;
2、 存入或者更新brokerName信息集合clusterAddrTable;
3、 存入或者更新broker基本信息集合brokerAddrTable存入一个brokerData对象;
4、 如果当前broker是主broker节点更新或者创建topic的队列配置信息集合topicQueueTable;
5、 存入或者更新中broker状态信息集合brokerLiveTable存入或者更新的信息包括最新的更新时间戳设置为当前时间,brokerLiveTable被nameServer用于执行心跳检测操作;
6、 存入或者更新消费过滤信息集合filterServerListClassFilter模式的消费过滤集合的操作;
7、 如果当前broker不是主broker节点对返回结果result设置HaServerAddr以及MasterAddr的地址;
8、 释放写锁;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
|
public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try {
this.lock.writeLock().lockInterruptibly();
Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName);
boolean registerFirst = false; BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } }
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); }
if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } }
if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally {
this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); }
return result; }
|
3 NameServer的心跳检测服务
NameServer在启动的时候,会启动一个定时周期任务。默认每隔10秒执行一次扫描无效的Broker,并清除Broker相关路由信息的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS);
|
3.1 scanNotActiveBroker扫描清除不活跃broker
这个方法就是nameserver中每隔10秒执行一次扫描无效的Broker,并清除无效的Broker的连接以及路由信息的任务方法。
方法很简单,就是扫描RouteInfoManager的brokerLiveTable集合中的每一个BrokerLiveInfo。然后判断如果当前时间戳 大于
上次接收心跳的时间戳 + Broker过期时间,那么就剔除该Broker,Broker过期时间默认120秒。
所谓的剔除该Broker,就是两件事:
- 调用RemotingUtil#closeChannel关闭和当前broker的通道,即关闭与此broker的socket连接。
- 调用RouteInfoManager#onChannelDestroy清除该broker的无效的路由信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
public void scanNotActiveBroker() { Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
|
3.1.1 onChannelDestroy清除路由信息
该方法用于在断开连接之后请求与该broker相关的无效路由信息。
删除信息的时候同样需要先加写锁,然后从brokerLiveTable、filterServerTable、brokerAddrTable、clusterAddrTable、topicQueueTable这五个路由表中删除数据,可以看作是registerBroker注册broker信息方法的逆向操作,比较简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
|
public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel != null) { try { try { this.lock.readLock().lockInterruptibly(); Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator(); while (itBrokerLiveTable.hasNext()) { Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next(); if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } else { log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound); }
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
try { try { this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound); String brokerNameFound = null; boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } }
if (brokerNameFound != null && removeBrokerName) { Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); String clusterName = entry.getKey(); Set<String> brokerNames = entry.getValue(); boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName); if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); it.remove(); }
break; } } }
if (removeBrokerName) { Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator(); while (itTopicQueueTable.hasNext()) { Entry<String, List<QueueData>> entry = itTopicQueueTable.next(); String topic = entry.getKey(); List<QueueData> queueDataList = entry.getValue();
Iterator<QueueData> itQueueData = queueDataList.iterator(); while (itQueueData.hasNext()) { QueueData queueData = itQueueData.next(); if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); } } if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } }
|
如果broker和nameserver之间的长连接异常关闭,那么此前绑定的BrokerHousekeepingService就发挥了作用,BrokerHousekeepingService继承了ChannelEventListener,当触发连接异常事件时,BrokerHousekeepingService内部的方法同样会调用RouteInfoManager#onChannelDestroy清除路由信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class BrokerHousekeepingService implements ChannelEventListener { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final NamesrvController namesrvController;
public BrokerHousekeepingService(NamesrvController namesrvController) { this.namesrvController = namesrvController; } @Override public void onChannelConnect(String remoteAddr, Channel channel) { } @Override public void onChannelClose(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelException(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelIdle(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } }
|
4 总结
本那次我们学习了Broker和NameServer之间的心跳服务的源码。包括Broker的心跳上报、NameServer的心跳处理、NameServer的心跳检测三部分的源码。从这些源码可以知道:
1、
broker的信息会向nameServer集群中的每一个节点都上报数据,即心跳包,上报的数据包括broker的基本信息,例如brokerAddr、brokerId、brokerName、clusterName等,以及该broker的topic配置信息,比如topicName名字、perm权限、读写队列数量等等属性,当前上报的数据的时间戳版本Dataversion,以及消费过滤信息集合filterServerList;
2、
nameServer收到心跳包之后会解析数据并存储在RouteInfoManager的5个map属性中topicQueueTable、brokerAddrTable、clusterAddrTable、brokerLiveTable、filterServerTable;
3、 每个nameserver之间不会互相通信,数据不会同步,另外,Nameserver的所有路由数据都存储在内存中,不存在持久化操作,所以nameserver非常的轻量级;
4、
nameServer没有数据同步、持久化等机制,这可能会造成数据的不一致,但是能够保证服务的高可用,而对于RocketMQ这样的组件来说,可以牺牲一时的数据不一致,但是不能容忍服务的不可,即nameServer保证了CAP中的AP;
__END__