Broker启动的方式和过程跟NameServer(https://greaterway.cn/archives/rocketmqnameserver-qi-dong-guo-cheng)比较类似
一、入口
public static void main(String[] args) {
start(createBrokerController(args));
}
BrokerStartup类的main方法为Broker的运行入口,在main方法中,通过createBrokerController创建BrokerControler对象实例,再通过start方法来启动BrokerController,类的命名风格和启动过程跟NameServer一致。
二、命令行参数
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
Broker同样以POSIX风格去读取命令行数。
private static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Broker config properties file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "printConfigItem", false, "Print all config item");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("m", "printImportantConfig", false, "Print important config item");
opt.setRequired(false);
options.addOption(opt);
return options;
}
在buildCommandlineOptions方法中,可以看到Broker有3个命令行参数,分别如下:
-c configFile :指定Broker的配置文件
-p:打印全部配置项
-m:打印重要的配置项
三、加载配置文件
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
// 支持TLS加密认证
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
// broker默认端口为10911
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
在启动Broker的时候,可以通过命令行参数-c configFile来指定Broker配置文件,该配置文件包含4种配置信息,分别对应BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig,通过类名可以大概猜出它们的作用:
BrokerConfig:Broker本身相关配置
NettyServerConfig:netty服务端配置(对于MQ生产者和消费者来说,Broker就是服务端)
NettyClientConfig:netty客户端配置(对于NameServer来说,Broker就是客户端)
MessageStoreConfig:消息存储相关配置
storePathRootDir=C:/rocketmq/store
storePathCommitLog=C:/rocketmq/store/commitlog
storePathConsumeQueue=C:/rocketmq/store/consumequeue
storePathIndex=C:/rocketmq/store/index
storeCheckpoint=C:/rocketmq/store/checkpoint
这里提一下,我们在Broker配置文件broker.conf中,通常都要指定上面的配置项,用于指定数据存储的位置,如果不配置的话,其中storePathRootDir默认就是当前系统用户目录下的store目录,在MessageStoreConfig可以看到相关的默认配置信息。
//The root directory in which the log data is kept
@ImportantField
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
//The directory in which the commitlog is kept
@ImportantField
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
Broker必须指定rocketmqHome配置项,默认该配置是读取环境变量ROCKETMQ_HOME,如果配置文件broker.conf中有指定rocketmqHome配置,则以配置文件为准,从源码可以看出,如果既不配置环境变量ROCKETMQ_HOME,也不在配置文件broker.conf指定rocketmqHome配置,那么Broker将不能正常启动。
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}
此外,从上面源码可以看到有多个NameServer地址时,通过“;”符号进行分割。
四、创建BrokerController
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
加载完配置文件之后,通过new BrokerController的方式创建BrokerController对象实例,并执行其initialize方法进行初始化操作。其中BrokerController构造方法中创建各种BlockingQueue,initialize方法中初始化各种线程池。
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
在initialize方法中,我们注意到broker除了创建remotingServer之外还创建fastRemotingServer,端口号是默认的10911-2,也就是10909,为什么broker要创建这个呢?该端口为vip端口号,如果MQ生产者通过vip发送,则走的端口就是10909,而且该端口只处理消息生产请求,不处理消息消费请求。
五、启动
最后通过BrokerController的start方法来启动Netty以及其他各种线程。