irpas技术客

Springboot整合RabbitMQ_Buckletime_springboot整合rabbitmq

未知 3081

Springboot整合RabbitMQ docker 安装RabbitMQ配置文件RabbitMQ 基本使用1. Fanout模式(发布订阅)使用2. Direct模式(路由RoutingKey)使用3. Topic模式(模糊路由RoutingKey)使用4. Headers模式(参数匹配)使用5. Work模式使用 RabbitMQ 高级使用1. ttl消息过期时间2. 死信队列

docker 安装RabbitMQ docker run -it -d -p 5672:5672 -p15672:15672 --hostname my-rabbit --name my-rabbit \ -v /home/docker_volume/rabbit/:/var/lib/rabbitmq \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management

启动成功后,浏览器访问 http://192.168.0.105:15672/,进入rabbitmq管理界面。

配置文件

1. pom文件,添加amqp依赖

<!-- rabbit-mq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2. application.yml文件,配置RabbitMQ服务

server: port: 8080 # RabbitMQ配置 spring: rabbitmq: host: 192.168.0.105 port: 5672 username: admin password: admin virtual-host: / RabbitMQ 基本使用 1. Fanout模式(发布订阅)使用

1.1 Fanout模式配置文件,配置交换机、消息队列,并进行绑定

@Configuration public class RabbitMQConfigFanout { // 1.声明注册Fanout模式的交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanout-exchange"); } // 2.声明队列 参数durable: true表示消息队列持久化 @Bean public Queue smsQueue(){ return new Queue("fanout-queue-sms", true); } @Bean public Queue emailQueue(){ return new Queue("fanout-queue-email", true); } @Bean public Queue duanxinQueue(){ return new Queue("fanout-queue-duanxin", true); } // 3.消息队列和交换机进行绑定 @Bean public Binding smsBinding(){ return BindingBuilder.bind(smsQueue()).to(fanoutExchange()); } @Bean public Binding emailBinding(){ return BindingBuilder.bind(emailQueue()).to(fanoutExchange()); } @Bean public Binding duanxinBinding(){ return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange()); } }

1.2 生产者,发送消息到交换机,交换机会分发到所绑定消息队列

@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; public void makeOrder() { // 1.根据商品id查询库存是否充足 // 2.产生订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单id: " + orderId); // 3.RabbitMQ完成消息分发 /** * exchange: 交换机名称 * routingKey: 路由key/queue队列名称 * object: 消息内容 */ String exchange = "fanout-exchange"; String routingKey = ""; rabbitTemplate.convertAndSend(exchange, routingKey, orderId); } }

1.3 消费者,三个消费者分别消费三个队列的消息

@Service public class MQFanoutConsumer { @RabbitListener(queues = "fanout-queue-sms") public void receiveSmsMsg(String message){ System.out.println("sms fanout 接收到了订单信息 >> : " + message); } @RabbitListener(queues = "fanout-queue-email") public void receiveEmailMsg(String message){ System.out.println("email fanout 接收到了订单信息 >> : " + message); } @RabbitListener(queues = "fanout-queue-duanxin") public void receiveDuanxinMsg(String message){ System.out.println("duanxin fanout 接收到了订单信息 >> : " + message); } }

1.4 调用生产者方法进行测试,查看消费者控制台输出,三个消费者全部收到了消息

duanxin fanout 接收到了订单信息 >> : ff311aa8-37d0-4bb3-99f9-edb776799d69 email fanout 接收到了订单信息 >> : ff311aa8-37d0-4bb3-99f9-edb776799d69 sms fanout 接收到了订单信息 >> : ff311aa8-37d0-4bb3-99f9-edb776799d69 2. Direct模式(路由RoutingKey)使用

2.1 Direct模式配置文件,配置交换机、消息队列,并进行绑定,并绑定 routingKey

@Configuration public class RabbitMQConfigDirect { // 1.声明注册Direct模式的交换机 @Bean public DirectExchange directExchange(){ return new DirectExchange("direct-exchange"); } // 2.声明队列 参数durable: true表示消息队列持久化 @Bean public Queue smsQueueDirect(){ return new Queue("direct-queue-sms", true); } @Bean public Queue emailQueueDirect(){ return new Queue("direct-queue-email", true); } @Bean public Queue duanxinQueueDirect(){ return new Queue("direct-queue-duanxin", true); } // 3.消息队列和交换机进行绑定,指定routingKey @Bean public Binding smsBindingDirect(){ return BindingBuilder.bind(smsQueueDirect()).to(directExchange()).with("sms"); } @Bean public Binding emailBindingDirect(){ return BindingBuilder.bind(emailQueueDirect()).to(directExchange()).with("email"); } @Bean public Binding duanxinBindingDirect(){ return BindingBuilder.bind(duanxinQueueDirect()).to(directExchange()).with("duanxin"); } }

2.2 生产者,发送消息到交换机,并指定routingKey,交换机会分发到所绑定消息队列

public void makeOrderDirect() { // 1.根据商品id查询库存是否充足 // 2.产生订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单id: " + orderId); // 3.RabbitMQ完成消息分发 String exchange = "direct-exchange"; rabbitTemplate.convertAndSend(exchange, "email", orderId); rabbitTemplate.convertAndSend(exchange, "sms", orderId); }

2.3 消费者,三个消费者分别消费三个队列的消息

@Service public class MQDirectConsumer { @RabbitListener(queues = "direct-queue-sms") public void receiveSmsMsg(String message){ System.out.println("sms direct 接收到了订单信息 >> : " + message); } @RabbitListener(queues = "direct-queue-email") public void receiveEmailMsg(String message){ System.out.println("email direct 接收到了订单信息 >> : " + message); } @RabbitListener(queues = "direct-queue-duanxin") public void receiveDuanxinMsg(String message){ System.out.println("duanxin direct 接收到了订单信息 >> : " + message); } }

2.4 调用生产者方法进行测试,查看消费者控制台输出,只有匹配的routingKey才收到了消息

email direct 接收到了订单信息 >> : fd718eb4-060b-451a-b600-8f8f4f7a826d sms direct 接收到了订单信息 >> : fd718eb4-060b-451a-b600-8f8f4f7a826d 3. Topic模式(模糊路由RoutingKey)使用

3.1 Topic模式配置文件,配置交换机、消息队列,并进行绑定,并绑定 动态routingKey(模糊匹配)

@Configuration public class RabbitMQConfigTopic { // 1.声明注册Topic模式的交换机 @Bean public TopicExchange topicExchange(){ return new TopicExchange("topic-exchange"); } // 2.声明队列 参数durable: true表示消息队列持久化 @Bean public Queue smsQueueTopic(){ return new Queue("topic-queue-sms", true); } @Bean public Queue emailQueueTopic(){ return new Queue("topic-queue-email", true); } @Bean public Queue duanxinQueueTopic(){ return new Queue("topic-queue-duanxin", true); } // 3.消息队列和交换机进行绑定,模糊匹配routingKey *表示任意一个字符,#表示0个或任意多个字符 @Bean public Binding smsBindingTopic(){ return BindingBuilder.bind(smsQueueTopic()).to(topicExchange()).with("com.#"); } @Bean public Binding emailBindingTopic(){ return BindingBuilder.bind(emailQueueTopic()).to(topicExchange()).with("*.email.#"); } @Bean public Binding duanxinBindingTopic(){ return BindingBuilder.bind(duanxinQueueTopic()).to(topicExchange()).with("#.duanxin.#"); } }

3.2 生产者,发送消息到交换机,并指定动态routingKey,交换机会分发到所匹配的消息队列

public void makeOrderTopic() { // 1.根据商品id查询库存是否充足 // 2.产生订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单id: " + orderId); // 3.RabbitMQ完成消息分发 String exchange = "topic-exchange"; String routingKey = "com.duanxin"; // sms动态匹配规则 com.# // email动态匹配规则 *.email.# // duanxin动态匹配规则 #.duanxin.# // *表示任意一个字符,#表示0个或任意多个字符 rabbitTemplate.convertAndSend(exchange, routingKey, orderId); }

3.3 消费者,三个消费者分别消费三个队列的消息

@Service public class MQTopicConsumer { @RabbitListener(queues = "topic-queue-sms") public void receiveSmsMsg(String message){ System.out.println("sms topic 接收到了订单信息 >> : " + message); } @RabbitListener(queues = "topic-queue-email") public void receiveEmailMsg(String message){ System.out.println("email topic 接收到了订单信息 >> : " + message); } @RabbitListener(queues = "topic-queue-duanxin") public void receiveDuanxinMsg(String message){ System.out.println("duanxin topic 接收到了订单信息 >> : " + message); } }

3.4 调用生产者方法进行测试,查看消费者控制台输出,只有匹配的routingKey才收到了消息

sms topic 接收到了订单信息 >> : 079bc7d9-0984-4040-8d33-63a7324e428c duanxin topic 接收到了订单信息 >> : 079bc7d9-0984-4040-8d33-63a7324e428c 4. Headers模式(参数匹配)使用

4.1 Headers模式配置文件,配置交换机、消息队列,并进行绑定,并设置匹配参数和匹配模式(whereAll全部匹配,whereAny部分匹配)

@Configuration public class RabbitMQConfigHeaders { // 1.声明注册Headers模式的交换机 @Bean public HeadersExchange headersExchange(){ return new HeadersExchange("headers-exchange"); } // 2.声明队列 参数durable: true表示消息队列持久化 @Bean public Queue smsQueueHeaders(){ return new Queue("headers-queue-sms", true); } @Bean public Queue emailQueueHeaders(){ return new Queue("headers-queue-email", true); } @Bean public Queue duanxinQueueHeaders(){ return new Queue("headers-queue-duanxin", true); } // 3.消息队列和交换机进行绑定,设置匹配模式,全部匹配和部分匹配 @Bean public Binding smsBindingHeaders(){ Map<String,Object> map = new HashMap<>(); map.put("queueName","headers-queue-sms"); map.put("bindType","whereAll"); return BindingBuilder.bind(smsQueueHeaders()).to(headersExchange()).whereAll(map).match(); } @Bean public Binding emailBindingHeaders(){ Map<String,Object> map = new HashMap<>(); map.put("queueName","headers-queue-email"); map.put("bindType","whereAny"); return BindingBuilder.bind(emailQueueHeaders()).to(headersExchange()).whereAny(map).match(); } @Bean public Binding duanxinBindingHeaders(){ Map<String,Object> map = new HashMap<>(); map.put("queueName","headers-queue-duanxin"); map.put("bindType","whereAny"); return BindingBuilder.bind(emailQueueHeaders()).to(headersExchange()).whereAny(map).match(); } }

4.2 生产者

public void whereAllMatch() { System.out.println("===================whereAll 全部匹配======================="); // 1.根据商品id查询库存是否充足 // 2.产生订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单id: " + orderId); // 3.RabbitMQ完成消息分发 MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("queueName", "headers-queue-sms"); messageProperties.setHeader("bindType", "whereAll"); Message message = new Message(orderId.getBytes(), messageProperties); rabbitTemplate.convertAndSend("headers-exchange", "", message); } public void whereAnyMatch() { System.out.println("===================whereAny 部分匹配======================="); // 1.根据商品id查询库存是否充足 // 2.产生订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单id: " + orderId); // 3.RabbitMQ完成消息分发 MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("queueName", "headers-queue"); messageProperties.setHeader("bindType", "whereAny"); Message message = new Message(orderId.getBytes(), messageProperties); rabbitTemplate.convertAndSend("headers-exchange", "", message); }

4.3 消费者

@Service public class MQHeadersConsumer { @RabbitListener(queues = "headers-queue-sms") public void receiveSmsMsg(String message){ System.out.println("sms headers 接收到了订单信息 >> : " + message); } @RabbitListener(queues = "headers-queue-email") public void receiveEmailMsg(String message){ System.out.println("email headers 接收到了订单信息 >> : " + message); } @RabbitListener(queues = "headers-queue-duanxin") public void receiveDuanxinMsg(String message){ System.out.println("duanxin headers 接收到了订单信息 >> : " + message); } }

4.4 测试结果

===================whereAll 全部匹配======================= 订单id: 733f880d-5135-49fd-a454-781a36cc3d8e sms headers 接收到了订单信息 >> : 733f880d-5135-49fd-a454-781a36cc3d8e ===================whereAny 部分匹配======================= 订单id: 4c5f25f3-d5d4-470e-b27e-11b405e11c51 email headers 接收到了订单信息 >> : 4c5f25f3-d5d4-470e-b27e-11b405e11c51 duanxin headers 接收到了订单信息 >> : 4c5f25f3-d5d4-470e-b27e-11b405e11c51 5. Work模式使用 轮询模式的分发:一个消费者一条,按均分配;公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配; RabbitMQ 高级使用 1. ttl消息过期时间

1.1 给消息队列设置TTL过期时间x-message-ttl

@Configuration public class RabbitMQConfigDirectTTL { // 1.声明注册Direct模式的交换机 @Bean public DirectExchange directTTLExchange(){ return new DirectExchange("direct-ttl-exchange"); } // 2.设置消息队列的过期时间 x-message-ttl @Bean public Queue ttlQueueDirect(){ Map<String, Object> args = new HashMap<>(); // 设置消息队列的过期时间,单位为 ms args.put("x-message-ttl", 5000); // 设置死信交换机 和 routingKey args.put("x-dead-letter-exchange", "direct-dead-exchange"); args.put("x-dead-letter-routing-key", "dead"); // fanout模式不用配置routingKey return new Queue("direct-queue-ttl", true, false, false, args); } // 3.消息队列和交换机进行绑定 @Bean public Binding ttlBindingDirect(){ return BindingBuilder.bind(ttlQueueDirect()).to(directTTLExchange()).with("ttl"); } }

生产者产生消息:

public void makeOrderTTL() { // 1.根据商品id查询库存是否充足 // 2.产生订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单id: " + orderId); // 3.RabbitMQ完成消息分发 rabbitTemplate.convertAndSend("direct-ttl-exchange", "ttl", orderId); }

此种方式是给队列设置TTL,生产者产生的消息放入此队列中,5s后消息均会过期。

1.2 给消息设置TTL过期时间

public void makeOrderTTL() { // 1.根据商品id查询库存是否充足 // 2.产生订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单id: " + orderId); // 3.RabbitMQ完成消息分发 // 1)给消息队列设置TTL过期时间 rabbitTemplate.convertAndSend("direct-ttl-exchange", "ttl", orderId); // 2)给消息设置TTL过期时间 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); //5s message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; rabbitTemplate.convertAndSend("direct-ttl-exchange", "ttlMessage", orderId, messagePostProcessor); }

两种消息过期时间的区别

给队列设置TTL,此队列是个过期队列(TTL标识),过期的消息会转移到死信队列中去,需要绑定死信交换机(DLX)给消息设置过期时间,消息过期后会直接移除。

2. 死信队列

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机(DLX)。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。消息变成死信,可能是由于以下的原因:

消息被拒绝消息过期队列达到最大长度

定义死信队列,绑定死信交换机

@Configuration public class RabbitMQConfigDirectDead { // 1.声明注册Direct模式的交换机 @Bean public DirectExchange directDeadExchange(){ return new DirectExchange("direct-dead-exchange"); } // 2.声明队列 参数durable: true表示消息队列持久化 @Bean public Queue deadQueueDirect(){ return new Queue("direct-queue-dead", true); } // 3.消息队列和交换机进行绑定 @Bean public Binding deadBindingDirect(){ return BindingBuilder.bind(deadQueueDirect()).to(directDeadExchange()).with("dead"); } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #使用1 #Fanout模式使用2 #Direct模式使用3 #Topic模式使用docker