一、Producer路由信息
从NameServer章节分析得知,路由信息存储在NameServer,生产端和消费端定时向NameServer获取topic相关的路由信息;
从生产者启动流程得知:
路由信息的动态更新源码在MQClientInstance#startScheduledTask定时任务里面
具体方法:
updateTopicRouteInfoFromNameServer下图为路由更新流程
添加图片注释,不超过 140 字(可选)
接下来我们着重解析此段源码:
1 定时任务:频率-30s
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//从nameServer更新路由信息 -定时任务:30s一次
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
2 updateTopicRouteInfoFromNameServer
public void updateTopicRouteInfoFromNameServer() {
Set topicList = new HashSet();
{ // Consumer 消费端,后续再分析
...省略....
}
{ // Producer 生产端
Iterator<>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
Set lst = impl.getPublishTopicList();//1>获取所有 topic-list
topicList.addAll(lst);
}
}
}
//2>更新路由信息
for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
}
}
分析如下:
1.1 getPublishTopicListgetPublishTopicList 方法分析:
public Set getPublishTopicList() {
Set topicList = new HashSet();
for (String key : this.topicPublishInfoTable.keySet()) {
topicList.add(key);
}
return topicList;
}
备注:
细心的你可能发现从启动流程中得知:
topicPublishInfoTable(ConcurrentHashMap)只会默认注册topic=TBW102的信息,那正常业务发送的topic是如何注册进去的呢,建议直接观看理解以下代码,在发送流程中会体现出如何注册到topicPublishInfoTable中;
topicPublishInfoTable数据的初始化(value:第一次默认都是new TopicPublishInfo())
//查找主题的路由信息的方法
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo 关键词:路由动态更新(动态路由更新算法)