irpas技术客

Springboot整合RabbitMQ手动ACK_LoneWalker、_rabbitmq springboot 手动ack

大大的周 5185

消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务但是只完成了部分突然它挂掉了,会发生什么情况?RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息,因为它无法接收到。为了保证消息在发送过程中不丢失,引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答 Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。Channel.basicNack (用于否定确认)Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。 生产者代码 properties server.port=8081 #rabbitmq服务器ip spring.rabbitmq.host=localhost #rabbitmq的端口 spring.rabbitmq.port=5672 #用户名 spring.rabbitmq.username=guest #密码 spring.rabbitmq.password=guest #配置虚拟机 spring.rabbitmq.virtual-host=demo #开启发送确认机制,消息到达交换机后会有回调 spring.rabbitmq.publisher-confirm-type=correlated #可以确保消息在未被队列接收时返回 spring.rabbitmq.publisher-returns=true ##发送重试配置 #启用发送重试 #spring.rabbitmq.template.retry.enabled=true #最大重试次数 #spring.rabbitmq.template.retry.max-attempts=5 #第一次和第二次尝试发布或传递消息之间的间隔 #spring.rabbitmq.template.retry.initial-interval=1000ms #应用于上一重试间隔的乘数 步长 #spring.rabbitmq.template.retry.multiplier=2 #最大重试时间间隔 #spring.rabbitmq.template.retry.max-interval=10000ms pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.6.3</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.78</version> </dependency> RabbitConfig @Configuration @Slf4j public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void initRabbitTemplate(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } /** * durable:是否持久化 * exclusive:是否独享、排外的 * autoDelete:是否自动删除 * @return */ @Bean Queue addUserQueue(){ return new Queue(RabbitConstant.QUEUE_ADD_USER,true,false,false); } /** * 消息成功到达交换机触发该方法 * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ //消息成功到达交换机 log.info("{}消息成功到达交换机",correlationData.getId()); }else{ log.error("{}消息未到达交换机,原因:{}",correlationData.getId(),cause); } } /** * 配置publisher-returns为true 消息未成功到达队列,会触发该方法 * @param returned */ @Override public void returnedMessage(ReturnedMessage returned) { log.error("{}消息未到达队列",returned.toString()); } }

这里我们直接用直连交换机,【DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息?routing key?相同的 Queue 上】

再写个常量类专门放队列名,交换机名啥的,写到配置文件也可以

RabbitConstant public class RabbitConstant { /** * 简单消息队列 */ public static final String QUEUE_HELLO_MSG = "hello_world_mq"; /** * 队列 */ public static final String QUEUE_ADD_USER = "queue.add.user"; } ProducerServiceImpl @Service public class ProducerServiceImpl implements ProducerService { @Autowired RabbitTemplate rabbitTemplate; @Override public Boolean addUser(User user) { //这里进行一些操作,然后把用户信息发送到消息队列 String userStr = JSON.toJSONString(user); rabbitTemplate.convertAndSend(RabbitConstant.QUEUE_ADD_USER, (Object) userStr,new CorrelationData(UUID.randomUUID().toString())); return true; } } 实体类User @Data @AllArgsConstructor @NoArgsConstructor public class User implements Serializable { private static final long serialVersionUID = 1809655848237434192L; private Integer id; private String userName; private String describe; } ProducerController @RestController public class ProducerController { @Autowired ProducerService producerService; @PostMapping("/addUser") public Boolean addUser(@RequestBody User user){ return producerService.addUser(user); } } 消费者代码 properties server.port=8082 #rabbitmq服务器ip spring.rabbitmq.host=localhost #rabbitmq的端口 spring.rabbitmq.port=5672 #用户名 spring.rabbitmq.username=guest #密码 spring.rabbitmq.password=guest #配置虚拟机 spring.rabbitmq.virtual-host=demo #设置消费端手动 ack none不确认 auto自动确认 manual手动确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual pom

pom就不贴了都一样的

ConsumerService @Service @Slf4j public class ConsumerService { public static final String QUEUE_ADD_USER = "queue.add.user"; @RabbitListener(queues =QUEUE_ADD_USER) @RabbitHandler public void addUser(String userStr,Message message, Channel channel){ long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //在这里做一些操作 User user = JSONObject.parseObject(userStr,User.class); log.info(user.toString()); //手动ack 第一个参数是消息的标记,第二个参数代表是false 代表仅仅确认当前消息,为true表示确认之前的所有消息 channel.basicAck(deliveryTag,false); } catch (Exception e) { //告诉mq本条消息消费失败 try { channel.basicNack(deliveryTag,false,true); } catch (IOException ex) { ex.printStackTrace(); } } } } 测试

?发送后我们看消费者这边已经拿到了

?来不及截图我又发送了一次,看一下RabbitMQ的控制台

?再修改一下消费端代码,直接除零异常,看是否会出现Nack

?好了我们再发送一次

?而且该消息一直在再投递

下一篇具体讲讲如何处理这种情况

基本概念可以参考此篇博文rabbitmq入门


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #rabbitmq #springboot #手动ack #rabbitmq手动ack