一、什么时候发起注册的动作?

// 注册Broker到所有NameServer上
this.registerBrokerAll(true, false, true);

在BrokerController的start方法,我们可以找到上面的代码,就是Broker发起注册的动作,注册Broker到所有的NameServer上。

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
    // 获取Topic相关配置,TopicConfigSerializeWrapper 从SerializeWrapper大致可以猜出是对数据进行序列化包装
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
            TopicConfig tmp =
                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                    this.brokerConfig.getBrokerPermission());
            topicConfigTable.put(topicConfig.getTopicName(), tmp);
        }
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);
    }

    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.brokerConfig.getRegisterBrokerTimeoutMills())) {
        // 执行注册操作
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    }
}

在registerBrokerAll方法中,首先获取Topic相关配置信息,然后进行序列化包装,最后调用doRegisterBrokerAll执行注册操作。

二、如何执行注册动作?

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
    TopicConfigSerializeWrapper topicConfigWrapper) {

    // 通过BrokerOuterAPI去执行注册请求
    // 返回List是因为有多个NameServer
    List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
        this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.getHAServerAddr(),
        topicConfigWrapper,
        this.filterServerManager.buildNewFilterServerList(),
        oneway, /* oneway表示不等待响应 */
        this.brokerConfig.getRegisterBrokerTimeoutMills(),
        this.brokerConfig.isCompressedRegister());

    if (registerBrokerResultList.size() > 0) {
        RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
        if (registerBrokerResult != null) {
            if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
            }

            this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

            if (checkOrderConfig) {
                this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
            }
        }
    }
}

在doRegisterBrokerAll方法中,我们可以看到,执行注册操作并不是BrokerController自己操作,而且委派brokerOuterAPI来执行,通过调用brokerOuterAPI.registerBrokerAll方法来执行Broker信息注册。

public List<RegisterBrokerResult> registerBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final boolean oneway,
    final int timeoutMills,
    final boolean compressed) {

    final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
    // 获取NameServer地址列表
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

        // 把Broker的基本信息放到请求头中
        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);

        // Topic配置信息和过滤器信息放在请求体重
        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);

        // 通过CountDownLatch来控制主线程执行
        // 因为向NameServer注册Broker信息是通过子线程来执行的
        // 只有等子线程执行完毕,主线程才继续往下走,最后响应结果
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 向某一个NameServer执行注册操作
                        RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                        if (result != null) {
                            registerBrokerResultList.add(result);
                        }

                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }

        try {
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
    }

    return registerBrokerResultList;
}

在BrokerOuterAPI类的registerBrokerAll方法中,我们可以看到消息的封装过程,Broker把消息分成两部分,其中Broker自身的信息比如BrokerAddr、BrokerId等放在请求头中,主题相关的信息放在请求体中,然后通过子线程的方式同时向所有NameServer发起注册请求,并同步等待执行结果。这里用到了CountDownLatch,就是用来控制线程的执行顺序,当子线程全部执行完毕,主线程才能继续往下执行。

三、网络请求如何发起的?

private RegisterBrokerResult registerBroker(
    final String namesrvAddr,
    final boolean oneway,
    final int timeoutMills,
    final RegisterBrokerRequestHeader requestHeader,
    final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
    InterruptedException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
    request.setBody(body);
   // oneway的方式处理,这里忽略,

    // 通过remotingClient执行请求,并同步等待执行结果
    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
    
    // 处理响应结果

    throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}


我们继续跟进BrokerOuterAPI的registerBroker方法可以看到,真正执行网络请求的是remotingClient.invokeSync方法,其中remotingClient就是Netty客户端。

@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    long beginStartTime = System.currentTimeMillis();
    // 获取一个Channel
    // 先从缓存中获取Channel,如果缓存不存在的话则创建Channel
    // Broker连接NameServer采用长连接的方式,也就是Broker不会主动关闭Channel
    // 关闭Channel由NameServer来操作,所以当Channel被关闭了,Broker中的缓存也会清除相关Channel
    // RocketMQ采用这种方式我认为是为了控制NameServer上Broker信息的时效性
    // 因此Broker不会主动关闭Channel,如果NameServer监听到Channel主动断开,
    // 那么可以认为Broker宕机或者网络出现问题,这个时候NameServer可以移除Broker信息
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (RemotingTimeoutException e) {
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}


在NettyRemotingClient的invokeSync方法中,我们可以看到,在执行请求之前首先获取一个Channel,可以理解成建立一个连接。NettyRemotingClient并不会直接创建一个Channel,而是先从缓存中获取Channel,如果缓存不存在的话则创建Channel,Broker采用长连接的方式来连接NameServer,也就是Broker不会主动关闭Channel,关闭Channel由NameServer来操作,所以当Channel被关闭了,Broker中的缓存也会清除相关Channel,RocketMQ采用这种方式我认为是为了控制NameServer上Broker信息的时效性,因此Broker不会主动关闭Channel,如果NameServer监听到Channel主动断开, 那么可以认为Broker宕机或者网络出现问题,这个时候NameServer可以移除Broker信息。

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();

    try {
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        // channel把请求写出去,并通过回调的方式等待NameServer的响应
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    System.out.println("send a request command to channel <" + addr + "> success.");
                    log.info("send a request command to channel <" + addr + "> success.");
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }

                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });

        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }

        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}
最后在invokeSyncImpl方法中通过channel.writeAndFlush把请求写出去,并同步等待响应结果。

四、总结一下

Broker通过TCP 长连接的方式与NameServer进行交互。

打赏
支付宝 微信
上一篇 下一篇