一、Broker通过心跳的方式向NameServer注册信息

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

在BrokerControler的start方法中,可以看到上面的代码,BrokerController启动了一个定时任务,每隔30秒向NameServer注册信息。

二、NameServer如何进行故障感知?

BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
    new BrokerLiveInfo(
        System.currentTimeMillis(),
        topicConfigWrapper.getDataVersion(),
        channel,
        haServerAddr));

在RouteInfoManager类的registerBroker方法中,可以看到上面的代码,NameServer把Broker的信息存入到brokerLiveTable中。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);

然后通过定时任务每隔10秒去扫描不活跃的Broker。

public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

NameServer定义了Broker的失效时间为2分钟,如果Broker2分钟内没有向NameServer发送注册请求,那么NameServer就认为Broker可能宕机了,就会主动关闭Broker连接并移除brokerLiveTable中对应的broker信息。

打赏
支付宝 微信
上一篇 下一篇