Broker启动的方式和过程跟NameServer(https://greaterway.cn/archives/rocketmqnameserver-qi-dong-guo-cheng)比较类似

一、入口

public static void main(String[] args) {
    start(createBrokerController(args));
}

BrokerStartup类的main方法为Broker的运行入口,在main方法中,通过createBrokerController创建BrokerControler对象实例,再通过start方法来启动BrokerController,类的命名风格和启动过程跟NameServer一致。

二、命令行参数

Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
    new PosixParser());
if (null == commandLine) {
    System.exit(-1);
}

Broker同样以POSIX风格去读取命令行数。

private static Options buildCommandlineOptions(final Options options) {
    Option opt = new Option("c", "configFile", true, "Broker config properties file");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("p", "printConfigItem", false, "Print all config item");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("m", "printImportantConfig", false, "Print important config item");
    opt.setRequired(false);
    options.addOption(opt);

    return options;
}

在buildCommandlineOptions方法中,可以看到Broker有3个命令行参数,分别如下:
-c configFile :指定Broker的配置文件
-p:打印全部配置项
-m:打印重要的配置项

三、加载配置文件

final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();

// 支持TLS加密认证
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
    String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
// broker默认端口为10911
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();

if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
    int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
    messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}

if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
        configFile = file;
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in);

        properties2SystemEnv(properties);
        MixAll.properties2Object(properties, brokerConfig);
        MixAll.properties2Object(properties, nettyServerConfig);
        MixAll.properties2Object(properties, nettyClientConfig);
        MixAll.properties2Object(properties, messageStoreConfig);

        BrokerPathConfigHelper.setBrokerConfigPath(file);
        in.close();
    }
}

在启动Broker的时候,可以通过命令行参数-c configFile来指定Broker配置文件,该配置文件包含4种配置信息,分别对应BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig,通过类名可以大概猜出它们的作用:
BrokerConfig:Broker本身相关配置
NettyServerConfig:netty服务端配置(对于MQ生产者和消费者来说,Broker就是服务端)
NettyClientConfig:netty客户端配置(对于NameServer来说,Broker就是客户端)
MessageStoreConfig:消息存储相关配置

storePathRootDir=C:/rocketmq/store
storePathCommitLog=C:/rocketmq/store/commitlog
storePathConsumeQueue=C:/rocketmq/store/consumequeue
storePathIndex=C:/rocketmq/store/index
storeCheckpoint=C:/rocketmq/store/checkpoint

这里提一下,我们在Broker配置文件broker.conf中,通常都要指定上面的配置项,用于指定数据存储的位置,如果不配置的话,其中storePathRootDir默认就是当前系统用户目录下的store目录,在MessageStoreConfig可以看到相关的默认配置信息。

//The root directory in which the log data is kept
@ImportantField
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";

//The directory in which the commitlog is kept
@ImportantField
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
    + File.separator + "commitlog";


Broker必须指定rocketmqHome配置项,默认该配置是读取环境变量ROCKETMQ_HOME,如果配置文件broker.conf中有指定rocketmqHome配置,则以配置文件为准,从源码可以看出,如果既不配置环境变量ROCKETMQ_HOME,也不在配置文件broker.conf指定rocketmqHome配置,那么Broker将不能正常启动。

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

if (null == brokerConfig.getRocketmqHome()) {
    System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
    System.exit(-2);
}

String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
    try {
        String[] addrArray = namesrvAddr.split(";");
        for (String addr : addrArray) {
            RemotingUtil.string2SocketAddress(addr);
        }
    } catch (Exception e) {
        System.out.printf(
            "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
            namesrvAddr);
        System.exit(-3);
    }
}

此外,从上面源码可以看到有多个NameServer地址时,通过“;”符号进行分割。

四、创建BrokerController

final BrokerController controller = new BrokerController(
    brokerConfig,
    nettyServerConfig,
    nettyClientConfig,
    messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

boolean initResult = controller.initialize();
if (!initResult) {
    controller.shutdown();
    System.exit(-3);
}

加载完配置文件之后,通过new BrokerController的方式创建BrokerController对象实例,并执行其initialize方法进行初始化操作。其中BrokerController构造方法中创建各种BlockingQueue,initialize方法中初始化各种线程池。

this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);

在initialize方法中,我们注意到broker除了创建remotingServer之外还创建fastRemotingServer,端口号是默认的10911-2,也就是10909,为什么broker要创建这个呢?该端口为vip端口号,如果MQ生产者通过vip发送,则走的端口就是10909,而且该端口只处理消息生产请求,不处理消息消费请求。

五、启动

最后通过BrokerController的start方法来启动Netty以及其他各种线程。

打赏
支付宝 微信
上一篇 下一篇