RocketMQ通过两阶段提交的方式实现事务消息,MQ生产者首先向RocketMQ发送half消息(prepare),RocketMQ响应之后生产者执行本地事务,然后根据本地事务执行情况再向RocketMQ发送Commit或者Rollback消息。
mq20

如果这个过程中,RocketMQ一直没有收到生产者发送的Commit或者Rollback消息,RocketMQ会回调生产者进行状态检查,那么生产者根据本地事务的执行情况,再向RocketMQ发送Commit或者Rollback消息。

只有RocketMQ接收到Commit消息之后,生产者发送的half消息才对消费者可见,才能被消费者消费,如果RocketMQ接收到Rollback消息,则直接删除half消息。

在生产者Commit消息之前,half消息存放在RMQ_SYS_TRANS_HALF_TOPIC主题下,等到Commit之后才会真正存放到指定的Topic下。

@Test
public void transactionMessageProduce() throws Exception {
    TransactionListener transactionListener = new TransactionListener() {
        /**
         * 当发送事务消息half成功后,调用该方法执行本地事务
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            System.out.println("............此处模拟执行本地事务............");
            System.out.println("message: " + msg + ", arg: " + arg);

            try {
                // 模拟本地事务执行耗时
                Thread.sleep(20000);
            } catch (InterruptedException e) {}
            // 本地事务执行成功则响应Commit,否则响应Rollback
            // return LocalTransactionState.ROLLBACK_MESSAGE;
            return LocalTransactionState.COMMIT_MESSAGE;
        }

        /**
         *  如果RocketMQ一直没有收到half消息的响应,RocketMQ会回调此方法回查本地事务执行结果
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            System.out.println("............此处检查本地事务执行结果............");
            // 本地事务执行成功则响应Commit,否则响应Rollback
            // return LocalTransactionState.ROLLBACK_MESSAGE;
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    };

    TransactionMQProducer producer = new TransactionMQProducer("test_transaction_producer");
    producer.setNamesrvAddr("192.168.253.100:9876");
    producer.setSendLatencyFaultEnable(true);
    producer.setTransactionListener(transactionListener);
    producer.start();

    Message message = new Message("test-topic", "测试事务消息...".getBytes(RemotingHelper.DEFAULT_CHARSET));
    producer.sendMessageInTransaction(message, null);

    Thread.sleep(1000000000);
}

@Test
void consume() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
    consumer.setNamesrvAddr("192.168.253.100:9876");
    consumer.subscribe("test-topic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            list.forEach(me -> System.out.println(new String(me.getBody())) );
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();

    Thread.sleep(1000000000);
}
打赏
支付宝 微信
上一篇 下一篇