irpas技术客

RabbitMQ之死信队列_Sofiax_mq死信队列

大大的周 4324

一、死信队列概念

顾名思义,(死去的消息)即无法被消费的消息,指的是消费者在消费生产者生产的消息时发生了某些特殊情况(下文会说),导致消息无法被正常消费,存放这些未被消费的消息的队列即为死信队列。

二、死信队列应用场景 为了保证消息不被丢失,专门存放消息做延时,将过了过期时间的消息存放在该队列中重新消费 三、造成消息成为死信的三种原因(下文详述) 消息过了过期时间TTL(time to live)消息队列达到了最大长度消息被消费者拒绝(basic.reject或者basic.nack)且requeue=false 四、死信队列的用法

基本需要: 两个个交换机:生产者通过交换机发送给普通队列、普通队列通过交换机将死信转发给死信队列 两个队列:普通队列、死信队列(注:队列模式为direct) 两个router key: 生产者绑定交换机之间、死信交换机到死信队列之间

五、普通队列和死信队列之间的绑定(Java) //声明普通、死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.Direct); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.Direct); //声明普通队列,通过参数设置死信交换机,死信RoutingKey Map<String, Object> paramMap = new HashMap<>(); paramMap.put("x-dead-letter-exchange", DEAD_EXCHANGE); paramMap.put("x-dead-letter-routing-key", "dead_routingkey"); channel.qunueDeclare(NORMAL_QUEUE, false,false,false,paramMap); //声明死信队列 channel.qunueDeclare(DEAD_QUEUE, false,false,false,null); //绑定普通交换机与普通队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE,"normal_routingkey"); //绑定死信交换机与死信队列 channel.qunueBind(DEAD_QUEUE, DEAD_EXCHANGE,"dead_routingkey");

注:为普通队列设置死信队列关键在于参数的设置(paramMap)

六、消息成为死信的三种原因示例

(1)消息过了过期时间TTL(time to live) 只需要设置消息的过期时间,普通队列在过期时间之内没有收到消息,则消息会成为死信并被转发到死信队列中。关于过期时间我所了解到的有两种设置方式:

在消费者(普通队列)上设置,即在上文代码所提paramMap 中设置普通队列参数: paramMap.put("x-message-ttl", 100000);//10秒后过期 在生产者设置生产时设置,若消费者未在过期时间内消费消息,则消息被转发到死信队列中 AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000"); //过期时间为1秒 //发布消息--------

(2)消息队列达到了最大长度 只需要在消费者(普通队列)设置死信队列时设置参数(即上文代码所提paramMap)

paramMap.put("x-max-length", maxLength);

当普通队列中消息堆积数量长度达到了maxLength,则会将新接收的消息转发到死信队列中去,从而避免消息丢失 (3)消息被消费者拒绝 生产者将消息发送给消费者后,消费者对消息进行了拒绝,具体操作是在普通队列处操作的

channel.basicRegect(message.getnvelop().getDeliveryTag, requeue:false);

若参数requeue为true,则表示还可以将此跳消息重新塞回普通队列,若为false则消息被拒绝后直接进入死信队列。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #mq死信队列 #To