irpas技术客

RabbitMQ (三)消息重试_fmi110_rabbitmq 重试

未知 5684

1 RabbitMQ自带的重试机制 1 示例代码

rabbitMQ为自带了消息重试机制:当消费者消费消息失败时,可以选择将消息重新“推送”给消费者,直至消息消费成功为止。

开启自带的重试机制,需要如下几个配置:

1 开启消费者手动应答机制,对应的springboot配置项:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

2 消费异常时,设置消息重新入列

boolean multiple = false; // 单条确认 boolean requeue ?= true; // 重新进入队列,谨慎设置!!!很容易导致死循环,cpu 100% channel.basicNack(tag, multiple, requeue);

以下是运行例子:

消费者代码如下:

package com.fmi110.rabbitmq; ? import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; ? import java.util.concurrent.atomic.AtomicInteger; ? /** * @author fmi110 * @description 消息消费者 * @date 2021/7/1 16:08 */ @Component @Slf4j public class RabbitConsumer { ? ? ?AtomicInteger count = new AtomicInteger(); ? ? ?@RabbitListener(queues="my-queue") ? ?public void consumer1(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{ ? ? ? ? ?log.info(">>>> consumer1 消费 tag = {},次数count={},消息内容 : {}",tag, count.incrementAndGet(),data); ? ? ? ? ? ?try { ? ? ? ? ? ?Thread.currentThread().sleep(1000); ? ? ? ? ? ?int i = 1/0; ? ? ? ? ? ?channel.basicAck(tag,true); // 确认消息消费成功 ? ? ? } catch (Exception e) { ? ? ? ? ? ?log.error(">>>>消费异常,消息重新进入队列并消费"); ? ? ? ? ? ?boolean multiple = false; // 单条确认 ? ? ? ? ? ?boolean requeue ?= true; // 重新进入队列,谨慎设置!!! ? ? ? ? ? ?channel.basicNack(tag, multiple, requeue); ? ? ? } ? } ? ? ?@RabbitListener(queues="my-queue") ? ?public void consumer2(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{ ? ? ? ?log.info(">>>> consumer2 消费 tag = {},次数count={},消息内容 : {}",tag, count.incrementAndGet(),data); ? ? ? ? ?try { ? ? ? ? ? ?Thread.currentThread().sleep(1000); ? ? ? ? ? ?int i = 1/0; ? ? ? ? ? ?channel.basicAck(tag,true); // 确认消息消费成功 ? ? ? } catch (Exception e) { ? ? ? ? ? ?log.error(">>>>消费异常,消息重新进入队列并消费"); ? ? ? ? ? ?boolean multiple = false; // 单条确认 ? ? ? ? ? ?boolean requeue ?= true; ? ? ? ? ? ?channel.basicNack(tag, multiple, requeue); ? ? ? } ? } } ?

这里模拟了两个消费者 consumer1、consumer2 ,并在逻辑中人为设置异常 int 1/0 , 在异常捕获中通过

channel.basicNack(tag, false, true);

设置消息重新进入队列,最终推给消费者再次消费。运行结果如下:

日志里包含了几个信息点:

消费者每次只消费一条消息,因为我设置了 spring.rabbitmq.listener.simple.prefetch=1

消息推送使用的 round-robin 算法

rabbitMQ的消费方式有推和拉两种方式,springboot创建的消费者模式使用的推的方式消费 this.channel.basicConsume()

2 潜在问题

如运行日志所示,重进进入队列的消息,会在队列头部,直接再次推送给消费者消费,如果是因为代码逻辑问题,将会导致消息一直消费失败,导致死循环!!!

比较合理的做法是,重试一定次数消费后,如果仍然失败,则终止重试,将消费异常的消息保存,并上报异常,由人工介入处理。

2 结合spring-retry和死信队列实现消息重试

一个比较合理的重试机制如下:

消息消费出现异常时,借助springboot提供的重试机制进行重试

因为使用的spring-retry,所以方法中必须抛出异常,否则spring-retry不会被触发!!!

重试仍然失败时,消息转发到死信队列,死信队列的消费者记录并上报异常信息

要实现消息消费失败自动转发到死信队列,则rabbitmq在创建消息队列时,需要指定与之绑定的死信队列

完整的实例代码如下:

1 配置文件 application.properties:

这里注释掉了 spring.rabbitmq.listener.simple.acknowledge-mode=manual ,这样在消息消费失败时,会自动转到死信队列,如果开启手动确认机制,必须调用 chanel.basicNack(tag,false,false) 消息才会进入死信队列!!!

# 应用名称 spring.application.name=rabbitmq server.port=8080 server.servlet.context-path=/ ? spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 # 指定连接的虚拟主机,可以在rabbitMQ控制台查看对应的虚拟主机的名字 spring.rabbitmq.virtual-host=my_vhost spring.rabbitmq.username=admin spring.rabbitmq.password=admin ? spring.rabbitmq.listener.simple.prefetch=1 ? # 开启 publish-comfirm 机制和消息路由匹配失败退回机制 spring.rabbitmq.publisher-returns=true spring.rabbitmq.publisher-confirm-type=correlated # 开启消费者应答 ack 机制 # spring.rabbitmq.listener.simple.acknowledge-mode=manual # 开启spring提供的retry spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.retry.max-attempts=3 spring.rabbitmq.listener.simple.retry.initial-interval=3000 2 RabbitConfig

主要在程序启动时,做如下设置:

创建死信队列和死信交换器,并将死信队列绑定到死信交换器。

创建普通队列和普通交换器,并将普通队列绑定到普通交换器,同时将死信队列与普通队列关联,这样当消息消费失败时,消息会进入死信队列(使用了自动 ack模式)。

package com.fmi110.rabbitmq.config; ? import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; ? import java.util.HashMap; ? ? /** * @author fmi110 * @description rabbitMQ 配置类 * @date 2021/7/1 15:08 */ @Configuration @Slf4j public class RabbitConfig { ? ? ?String dlQueueName ?= "my-queue-dl"; // 普通队列名称 ? ?String dlExchangeName = "my-exchange-dl"; // 死信交换器名称 ? ?String dlRoutingKey ? = "rabbit.test"; ? ? ?String queueName = "retry-queue"; ? ?String exchangeName = "my-exchange"; // 普通交换器名称 ? ? ?/** ? ? * 创建死信队列 ? ? * ? ? * @return ? ? */ ? ?@Bean ? ?public Queue queueDL() { ? ? ? ? ?return QueueBuilder ? ? ? ? ? ? ? .durable(dlQueueName) // 持久化队列 ? ? ? ? ? ? ? .build(); ? } ? ? ?/** ? ? * 创建死信交换机 ? ? * ? ? * @return ? ? */ ? ?@Bean ? ?public TopicExchange exchangeDL() { ? ? ? ?return new TopicExchange(dlExchangeName, true, false); ? } ? ? ?/** ? ? * 绑定操作 ? ? */ ? ?@Bean ? ?public Binding bindQueueDL2ExchangeDL(Queue queueDL, TopicExchange exchangeDL) { ? ? ? ?log.info(">>>> 队列与交换器绑定"); ? ? ? ?return BindingBuilder.bind(queueDL).to(exchangeDL).with(dlRoutingKey); ? } ? ? ?/** ? ? * 创建持久化队列,同时绑定死信交换器 ? ? * ? ? * @return ? ? */ ? ?@Bean ? ?public Queue queue() { ? ? ? ?log.info(">>>> 创建队列 retry-queue"); ? ? ? ?HashMap<String, Object> params = new HashMap<>(); ? ? ? ?params.put("x-dead-letter-exchange", dlExchangeName); ? ? ? ?params.put("x-dead-letter-routing-key", dlRoutingKey); ? ? ? ? ?return QueueBuilder ? ? ? ? ? ? ? .durable(queueName) // 持久化队列 ? ? ? ? ? ? ? .withArguments(params) // 关联死信交换器 ? ? ? ? ? ? ? .build(); ? } ? ? ? ?/** ? ? * 创建交换机 ? ? * ? ? * @return ? ? */ ? ?@Bean ? ?public TopicExchange exchange() { ? ? ? ?log.info(">>>> 创建交换器 my-exchange"); ? ? ? ?boolean durable ? ?= true; // 持久化 ? ? ? ?boolean autoDelete = false; // 消费者全部解绑时不自动删除 ? ? ? ?return new TopicExchange(exchangeName, durable, autoDelete); ? } ? ? ?/** ? ? * 绑定队列到交换机 ? ? * ? ? * @param queue ? ? * @param exchange ? ? * @return ? ? */ ? ?@Bean ? ?public Binding bindQueue2Exchange(Queue queue, TopicExchange exchange) { ? ? ? ?log.info(">>>> 队列与交换器绑定"); ? ? ? ?return BindingBuilder.bind(queue).to(exchange).with("rabbit.test"); ? } ? // ? /** // ? ? * spring-retry重试机制:当重试次数达到最大,消息仍然消费失败时回调。 // ? ? * 如果开启这个类,则死信队列失效,消息消费失败,即使配置了死信队列,消息也不会进入死信队列。 // ? ? * 重试失败回调和死信队列只能二选一!!!spring 提供回调实现类有如下几个: // ? ? * RejectAndDontRequeueRecoverer :消费失败,并且消息不再入列,spring默认使用。 // ? ? * ImmediateRequeueMessageRecoverer :将消息重新入列 // ? ? * RepublishMessageRecoverer:转发消息到指定的队列, // ? ? * @return // ? ? */ // ? @Bean // ? public MessageRecoverer messageRecoverer(){ // ? ? ? return new MessageRecoverer() { // ? ? ? ? ? @Override // ? ? ? ? ? public void recover(Message message, Throwable cause) { // ? ? ? ? ? ? ? log.info(message.toString()); // ? ? ? ? ? ? ? log.info("spring-retry重试次数达到最大,消息仍然失败的回调"); // ? ? ? ? ? ? ? // TODO: 记录错误信息并上报 // ? ? ? ? ? } // ? ? ? }; // ? } } ? 3 消息生产者 RabbitProducer

这里为了保证消息能确保消息发送,配置了 confirm 确认机制

package com.fmi110.rabbitmq; ? import com.rabbitmq.client.AMQP; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; ? import javax.annotation.PostConstruct; ? /** * @author fmi110 * @description 消息生产者 * @date 2021/7/1 15:08 */ @Component @Slf4j public class RabbitProducer { ? ?@Autowired ? ?RabbitTemplate rabbitTemplate; ? ? ?/** ? ? * 1 设置 confirm 回调,消息发送到 exchange 时回调 ? ? * 2 设置 return callback ,当路由规则无法匹配到消息队列时,回调 ? ? * ? ? * correlationData:消息发送时,传递的参数,里边只有一个id属性,标识消息用 ? ? */ ? ?@PostConstruct ? ?public void enableConfirmCallback(){ ? ? ? ?// #1 ? ? ? ?/** ? ? ? ? * 连接不上 exchange或exchange不存在时回调 ? ? ? ? */ ? ? ? ?rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{ ? ? ? ? ? ?if (!ack) { ? ? ? ? ? ? ? ?log.error("消息发送失败"); ? ? ? ? ? ? ? ?// TODO 记录日志,发送通知等逻辑 ? ? ? ? ? } ? ? ? }); ? ? ? ? ?// #2 ? ? ? ?/** ? ? ? ? * 消息投递到队列失败时,才会回调该方法 ? ? ? ? * message:发送的消息 ? ? ? ? * exchange:消息发往的交换器的名称 ? ? ? ? * routingKey:消息携带的路由关键字信息 ? ? ? ? */ ? ? ? ?rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) ->{ ? ? ? ? ? ?log.error("消息路由失败"); ? ? ? ? ? ?// TODO 路由失败后续处理逻辑 ? ? ? }); ? } ? ? ?public void send(String msg){ ? ? ? ?String exchangeName = "my-exchange"; ? ? ? ?// String routingKey ? = "aaa.xxx"; ? ? ? ?String routingKey ? = "rabbit.test"; ? ? ? ?rabbitTemplate.convertAndSend(exchangeName, routingKey, msg); ? } } ? 4 消息消费者 RabbitConsumer package com.fmi110.rabbitmq; ? import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; ? import java.util.concurrent.atomic.AtomicInteger; ? ? /** * @author fmi110 * @description 消息消费者 * @date 2021/7/1 16:08 */ @Component @Slf4j public class RabbitConsumer { ? ? ?AtomicInteger count = new AtomicInteger(); ? ? ?/** ? ? * 普通队列消费者 ? ? * @param data ? ? * @param channel ? ? * @param tag ? ? * @throws Exception ? ? */ ? ?@RabbitListener(queues="retry-queue") ? ?public void consumer(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{ ? ? ? ? ?log.info(">>>> consumer 消费 tag = {},次数count={},消息内容 : {}",tag, count.incrementAndGet(),data); ? ? ? ?// TODO 消息处理逻辑 ? ? ? ?throw new RuntimeException("抛出异常,模拟消费失败,触发spring-retry"); ? } ? ? ?/** ? ? * 死信队列消费者 ? ? * @param data ? ? * @param channel ? ? * @param tag ? ? * @throws Exception ? ? */ ? ?@RabbitListener(queues="my-queue-dl") ? ?public void consumeDL(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{ ? ? ? ?log.info(">>>> 死信队列消费 tag = {},消息内容 : {}",tag,data); // ? ? ? channel.basicNack(tag, false, false); ? } } ? 5 Controller

用于触发发送消息

package com.fmi110.rabbitmq.controller; ? ? import com.fmi110.rabbitmq.RabbitProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; ? import java.util.HashMap; ? @RestController public class TestController { ? ?@Autowired ? ?RabbitProducer rabbitProducer; ? ? ?@GetMapping("/test") ? ?public Object test() { ? ? ? ? ?rabbitProducer.send("this is a message"); ? ? ? ? ?HashMap<String, Object> result = new HashMap<>(); ? ? ? ?result.put("code", 0); ? ? ? ?result.put("msg", "success"); ? ? ? ?return result; ? } } 6 运行结果

运行日志如下:

: >>>> consumer 消费 tag = 1,次数count=1,消息内容 : this is a message : >>>> consumer 消费 tag = 1,次数count=2,消息内容 : this is a message : >>>> consumer 消费 tag = 1,次数count=3,消息内容 : this is a message o.s.a.r.r.RejectAndDontRequeueRecoverer ?: Retries exhausted for message (Body:'this is a message' MessageProperties [headers={spring_listener_return_correlation=2840e95b-8544-4ed8-b3ed-8ba02aee2729}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=my-exchange, receivedRoutingKey=rabbit.test, deliveryTag=1, consumerTag=amq.ctag-a5AZEb9AYpOzL6mQJQIvaQ, consumerQueue=retry-queue]) ? ... Caused by: java.lang.RuntimeException: 抛出异常,模拟消费失败,触发spring-retry at com.fmi110.rabbitmq.RabbitConsumer.consumer(RabbitConsumer.java:36) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181] .... ? : >>>> 死信队列消费 tag = 1,消息内容 : this is a message

从日志可看出,普通队列的消费者一共消费了三次仍然失败,最后回调spring提供 RejectAndDontRequeueRecoverer ,然后消息进入死信队列被消费。

7 pom依赖 <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>compile</scope> </dependency> </dependencies>


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #rabbitmq #重试 #1 #消费异常时设置消息重新入列