1. 启动流程
nameSrv 启动的时候首先会创建 NamesrvController,创建 NamesrvController 的时候会解析配置参数填充 NamesrvConfig,解析配置参数过程如下:
- 设置系统参数 rocketmq.remoting.version
- 解析系统启动参数,如果启动参数有 h(表示打印帮助选项),则在控制台打印帮助选项并退出
- 创建 nettyServerConfig 实例,设置默认的监听端口为 9876
- 解析 -c 参数,形式为 -c configFile
- 另外一种新式的参数配置为 configKey=configValue
NamesrvController 构造方法中创建如下实例:
- KVConfigManager 管理 KV 配置信息
- KVConfigManager 核心属性如下:
- HashMap<String/ Namespace /, HashMap<String/ Key /, String/ Value />> configTable namespace 维度的配置信息
- namesrvController 实例
- KVConfigManager 主要方法:
- load() 默认从 /namesrv/kvConfig.json 路径加载 namesrv 的配置到 configTable
- putKVConfig(final String namespace, final String key, final String value) 往指定的 namespace 中增加配置(存在则更新,不存在则创建)
- void persist() KV 配置持久化到文件(/namesrv/kvConfig.json),流程如下:
- 将内存中的配置存储到 kvConfig.json.tmp 文件
- 将原来的配置文件内存存储到 kvConfig.json.bak 文件
- 删除 kvConfig.json 文件
- 将 kvConfig.json.tmp 重命名为 kvConfig.json
- void deleteKVConfig(final String namespace, final String key) 删除指定的配置
- byte[] getKVListByNamespace(final String namespace) 获取指定的 namespace 的所有配置
- getKVConfig(final String namespace, final String key) 获取指定 namespace,指定 key 的配置
- void printAllPeriodically() 打印所有的 KV 配置
- KVConfigManager 核心属性如下:
- RouteInfoManager 管理路由信息
- 主要属性如下:
- HashMap<String/ topic /, List
> topicQueueTable topic 消息队列路由信息,消息发送时根据消息路由表进行负债均衡 - HashMap<String/ brokerName /, BrokerData> brokerAddrTable 包含 broker 基础信息,包含 brokerName、所属集群名称、主备 broker 地址
- HashMap<String/ clusterName /, Set<String/ brokerName />> clusterAddrTable broker 集群信息,存储集群中所有的 broker 名称
- HashMap<String/ brokerAddr /, BrokerLiveInfo> brokerLiveTable broker 状态信息,broker 每次收到心跳包都会更新该信息
- HashMap<String/ brokerAddr /, List
/ Filter Server /> filterServerTable broker 上的 filterServer 列表,用于类模式的消息过滤
- HashMap<String/ topic /, List
- 主要属性如下:
- BrokerHousekeepingService NettyRemotingServer 的事件监听器,监听 NettyRemotingServer 的如下事件:
- channelActive 即 CONNECT 事件,调用 BrokerHousekeepingService#onChannelConnect 方法,空实现
- channelInactive 即 CLOSE 事件,调用 BrokerHousekeepingService#onChannelClose 方法,最终调用 RouteInfoManager#onChannelDestroy 方法,移除对应 broker 节点的路由信息,逻辑如下:
- 根据 channel 在 brokerLiveTable 查找对应的 broker 节点地址信息
- 移除 brokerLiveTable 中对应的 broker 节点信息
- 移除 filterServerTable 对应的broker 节点信息
- 在 brokerAddrTable 中查找对应的 broker 节点信息,并移除
- 如果在 brokerAddrTable 移除 broker 节点信息后对应 brokerName 下所有的节点为空,则移除对应的 broker 信息
- 如果 broker 被移除了,则在 clusterAddrTable 移除对应的 broker 信息,如果 broker 信息移除以后对应的集群信息为空了则移除对应的集群信息
- 如果 broker 被移除了,则在 topicQueueTable 移除对应的 broker 信息
- userEventTriggered 处理心跳超时事件,逻辑同 channelInactive
- exceptionCaught 处理异常事件,逻辑同 channelInactive
NamesrvController 创建完成以后调用 NamesrvStartup#start 启动 namesrvController
1 | public static NamesrvController start(final NamesrvController controller) throws Exception { |
start 方法首先调用 controller.initialize() 方法初始化 namesrv 的核心控制器(即:NamesrvController);然后注册钩子方法,在 nameSrv 关闭之前调用 NamesrvController 的 shutdown 方法,释放资源。NamesrvController#initialize 的执行逻辑如下:
加载 kv 配置
1
this.kvConfigManager.load();
创建 NettyRemotingServer 实例
1
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
给 this.remotingExecutor 赋值
- 注册 DefaultRequestProcessor 处理器
1
this.registerProcessor();
1 | private void registerProcessor() { |
启动扫描 brokerLiveTable 的定时任务,梅 10s 扫描一次,移除处于不激活状态的 broker。broker 启动的时候会启动心跳线程,定时向 namesrv 发送心跳包,namesrv 收到心跳包以后会更新 brokerLiveTable 中对应 broker 的信息,如果 namesrv 长时间没有收到 broker 的心跳包,则 broker 被判断为不激活状态,需要被移除(最终执行 RouteInfoManager#onChannelDestroy 方法,逻辑见 RouteInfoManager 的 CLOSE 事件)
1
2
3
4
5
6this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);启动每 10s 打印 KV 配置的定时任务
1
2
3
4
5
6this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);tls 模式的处理
2. 路由注册
RocketMQ 路由注册通过 Broker 与 nameSrv 的心跳功能实现。Broker 启动时向所有的 namesrv 发送心跳包,每隔 30s 向集群中所有的 nameSrv 定时发送心跳包;namesrv 收到 broker 心跳包以后会更新 brokerLiveTable 缓存中 brokerLiveInfo 的 lastTimestamp,然后 namesrv 每隔 10s 扫描 brokerLiveInfo,如果连续 120s 没有收到心跳包,namesrv 将移除该 broker 的路由信息并关闭 socket 连接
- broker 发送心跳包的代码(broker 端的心跳定时任务,broker 每隔 30s 向集群中所有的 namesrv 发送心跳包)
1
2
3
4
5
6
7
8
9
10this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
registerBrokerAll 实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { // broker 不可读或不可写
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
流程如下:
- 获取 broker 自身的 topic 配置信息
- 调用 BrokerController#needRegister 判断是否需要注册,流程如下:
- 此方法中,broker 会向所有的 namesrv 发送 QUERY_DATA_VERSION 请求(请求体中带上自身的 dataVersion),namesrv 收到 QUERY_DATA_VERSION 请求以后,会查询出 namesrv 缓存中对应 broker 的 dateVersion,并与请求中的 dataVersion 进行对比,并将比对的结果在响应中返回
- 只要有任意一个 namesrv 返回的 changed 为 true,则该方法返回 true
- forceRegister 或 needRegister 方法返回 true,则执行 BrokerController#doRegisterBrokerAll 方法,注册路由信息,具体流程如下:
- 取出 broker 中存储的所有的 namesrv 的地址,向所有的 namesrv 发送 REGISTER_BROKER 请求(请求中带上该 broker 的 topic 配置信息)
- namesrv 收到 REGISTER_BROKER 请求后的处理流程如下:
- 根据 broker 所在的集群名称尝试获取集群信息,如果不存在则创建新的集群并添加到 clusterAddrTable 当中
- 将当前 broker 加入到集群中
- 维护 brokerData 信息,根据 brokerName 从 brokerName 获取 brokerData,不存在(即为首次注册)则创建新的 BrokerData 并加入到 brokerAddrTable 当中
- 如果 broker 为 master,并且 broker topic 信息发生变化或是初次注册,则需要创建或更新 topic 的路由元数据信息
- 更新 brokerLiveTable 信息,brokerLiveTable 是执行路由删除的重要依据
- 注册 filterServerTable 地址列表
- slave 节点更新 masterAddr 信息
3. 路由发现
RocketMQ 路由发现是非实时的,当 topic 的路由信息发生改变以后并不主动推送给客户端。而是有客户端定时主动拉取,拉取的编码为 GET_ROUTEINTO_BY_TOPIC,路由结果由 TopicRouteData 表示,主要属性如下:
| name | 类型 | 含义 |
|---|---|---|
| orderTopicConf | String | 顺序消息内容配置 |
| queueDatas | List |
topic 队列元数据 |
| brokerDatas | List |
topic 分布的 broker 元数据 |
| filterServerTable | HashMap<String/ brokerAddr /, List |
broker 上过滤服务的地址列表 |
namesrc 处理路由发现的方法如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
- 调用 RouteInfoManager#pickupTopicRouteData 分别从 topicQueueTable、brokerAddrTable、filterServerTable 中取出 topic 相关的信息填充至 TopicRouteData 的 queueDatas、brokerDatas、filterServerTable 属性当中
- 如果找到 topic 对应的路由信息且该 topic 为顺序消息,则从 namesrv 的 KVConfig 中取出顺序消息相关的配置填充至路由信息当中
- 如果找不到对应 topic 的路由信息,则返回 TOPIC_NOT_EXIST 表示没有路由信息