DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
consumer.setNamesrvAddr("192.168.253.100:9876");
consumer.subscribe("test-topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            // 此处执行业务逻辑处理...
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
});
consumer.start();

上面是一段MQ消费者消费test-topic主题的一段简单演示代码,该消费组的名字为test-consumer-group,如果消息消费成功,返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,如果消息消费失败,比如执行业务逻辑处理的时候报错了,那么则返回ConsumeConcurrentlyStatus.RECONSUME_LATER,那么对应的消费将进入test-consumer-group组的重试队列,队列名为:%RETRY%test-consumer-group,RocketMQ默认有16次重试机会,RocketMQ重推消息的时间间隔为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h(可以通过messageDelayLevel来执行,比如messageDelayLevel=1表示1s,messageDelayLevel=2表示5s以此类推,此处的s=second,m=minute,h=hour),那么如果RocketMQ重试了16次之后消费者还是响应ConsumeConcurrentlyStatus.RECONSUME_LATER的话,那么消息将进入死信队列,队列名为:%DLQ%test-consumer-group,表示该消息已经死掉,RocketMQ不再推送,如果消费者还想消费死信队列中的消息的话,可以单独监听%DLQ%test-consumer-group,然后不断进行重试。

打赏
支付宝 微信
上一篇 下一篇