一、什么时候发起注册的动作?
// 注册Broker到所有NameServer上
this.registerBrokerAll(true, false, true);
在BrokerController的start方法,我们可以找到上面的代码,就是Broker发起注册的动作,注册Broker到所有的NameServer上。
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
// 获取Topic相关配置,TopicConfigSerializeWrapper 从SerializeWrapper大致可以猜出是对数据进行序列化包装
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
// 执行注册操作
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
在registerBrokerAll方法中,首先获取Topic相关配置信息,然后进行序列化包装,最后调用doRegisterBrokerAll执行注册操作。
二、如何执行注册动作?
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
// 通过BrokerOuterAPI去执行注册请求
// 返回List是因为有多个NameServer
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway, /* oneway表示不等待响应 */
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
}
在doRegisterBrokerAll方法中,我们可以看到,执行注册操作并不是BrokerController自己操作,而且委派brokerOuterAPI来执行,通过调用brokerOuterAPI.registerBrokerAll方法来执行Broker信息注册。
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
// 获取NameServer地址列表
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
// 把Broker的基本信息放到请求头中
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
// Topic配置信息和过滤器信息放在请求体重
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
// 通过CountDownLatch来控制主线程执行
// 因为向NameServer注册Broker信息是通过子线程来执行的
// 只有等子线程执行完毕,主线程才继续往下走,最后响应结果
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 向某一个NameServer执行注册操作
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
在BrokerOuterAPI类的registerBrokerAll方法中,我们可以看到消息的封装过程,Broker把消息分成两部分,其中Broker自身的信息比如BrokerAddr、BrokerId等放在请求头中,主题相关的信息放在请求体中,然后通过子线程的方式同时向所有NameServer发起注册请求,并同步等待执行结果。这里用到了CountDownLatch,就是用来控制线程的执行顺序,当子线程全部执行完毕,主线程才能继续往下执行。
三、网络请求如何发起的?
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
// oneway的方式处理,这里忽略,
// 通过remotingClient执行请求,并同步等待执行结果
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
// 处理响应结果
throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}
我们继续跟进BrokerOuterAPI的registerBroker方法可以看到,真正执行网络请求的是remotingClient.invokeSync方法,其中remotingClient就是Netty客户端。
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
// 获取一个Channel
// 先从缓存中获取Channel,如果缓存不存在的话则创建Channel
// Broker连接NameServer采用长连接的方式,也就是Broker不会主动关闭Channel
// 关闭Channel由NameServer来操作,所以当Channel被关闭了,Broker中的缓存也会清除相关Channel
// RocketMQ采用这种方式我认为是为了控制NameServer上Broker信息的时效性
// 因此Broker不会主动关闭Channel,如果NameServer监听到Channel主动断开,
// 那么可以认为Broker宕机或者网络出现问题,这个时候NameServer可以移除Broker信息
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
在NettyRemotingClient的invokeSync方法中,我们可以看到,在执行请求之前首先获取一个Channel,可以理解成建立一个连接。NettyRemotingClient并不会直接创建一个Channel,而是先从缓存中获取Channel,如果缓存不存在的话则创建Channel,Broker采用长连接的方式来连接NameServer,也就是Broker不会主动关闭Channel,关闭Channel由NameServer来操作,所以当Channel被关闭了,Broker中的缓存也会清除相关Channel,RocketMQ采用这种方式我认为是为了控制NameServer上Broker信息的时效性,因此Broker不会主动关闭Channel,如果NameServer监听到Channel主动断开, 那么可以认为Broker宕机或者网络出现问题,这个时候NameServer可以移除Broker信息。
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
// channel把请求写出去,并通过回调的方式等待NameServer的响应
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
System.out.println("send a request command to channel <" + addr + "> success.");
log.info("send a request command to channel <" + addr + "> success.");
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
最后在invokeSyncImpl方法中通过channel.writeAndFlush把请求写出去,并同步等待响应结果。
四、总结一下
Broker通过TCP 长连接的方式与NameServer进行交互。