irpas技术客

SpringBoot集成RabbitMq_Blueeyedboy521_springboot集成rabbitmq

未知 4955

一、初识MQ

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且是基于AMQP协议的。

AMQP:Advanced Message Queuing Protocol,高级消息队列协议。是具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

1、同步调用的问题

2、异步调用方案

优势一:服务解耦

优势二:性能提升,吞吐量提高 优势三:服务没有强依赖,不担心级联失败问题 优势四:流量消峰 异步通讯的缺点 :

依赖于Broker的可靠性,安全性,吞吐能力架构复杂了,业务没有明显的流程线,不好追踪管理 3、各种MQ的对比

二、RabbitMQ入门

官网https://rabbitmq.com/

1、基本概念

核心概念

1.1、Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key (路由键)、priority (相对于其他消息的优先权)、delivery-mode (指出该消息可能需要持久性存储)等。

1.2、Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

1.3、Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Exchange有4种类型: direct(默认), fanout, topic,和headers,不同类型的Exchange转发消息的策略有所区别

1.4、Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

1.5、Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基 于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 Exchange和Queue的绑定可以是多对多的关系。

1.6、Connection

网络连接,比如一个TCP连接。

1.7、Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

1.8、Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

1.9、Virtual Host

虚拟主机,表示-一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是AMQP概念的基础,必须在连接时指定,   RabbitMQ默认的vhost是/。

1.10、Broker

表示消息队列服务器实体

2、安装

安装参考:https://blog.csdn.net/Blueeyedboy521/article/details/124001883

2、常见消息模型

2.1、基本消息类型

学习视频https://·/video/BV1LQ4y127n4?p=67&spm_id_from=pageDriver

三、SpringAMQP 1、什么是SpringAMQP

官方文档:https://spring.io/projects/spring-amqp

2、简单示例 2.1、引入AMQP依赖 <!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 2.2、配置文件 spring: rabbitmq: addresses: 192.168.100.120:5672,192.168.100.121:5672,192.168.100.122:5672 username: admin password: admin #开启消息确认模式,新版本已经弃用 #publisher-confirms: true #开启消息送达提示 publisher-returns: true # springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果 publisher-confirm-type: correlated virtual-host: / listener: type: simple simple: acknowledge-mode: auto #确认模式 prefetch: 1 #限制每次发送一条数据。 concurrency: 3 #同一个队列启动几个消费者 max-concurrency: 3 #启动消费者最大数量 #重试策略相关配置 retry: # 开启消费者(程序出现异常)重试机制,默认开启并一直重试 enabled: true # 最大重试次数 max-attempts: 5 # 重试间隔时间(毫秒) initial-interval: 3000

spring.rabbitmq.publisher-confirm-type新版发布确认属性有三种确认类型

/** * The type of publisher confirms to use. */ public enum ConfirmType { /** * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()} * within scoped operations. */ SIMPLE, /** * Use with {@code CorrelationData} to correlate confirmations with sent * messsages. */ CORRELATED, /** * Publisher confirms are disabled (default). */ NONE }

NONE值是禁用发布确认模式,是默认值 CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例 SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker; ———————————————— 版权声明:本文为CSDN博主「OkidoGreen」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/z69183787/article/details/109371628

2.3、发送消息

在publisher服务中新建一个测试类,编写测试方法

2.4、消费消息

3、WorkQueue工作队列 3.1、模拟WorkQueue实现一个队列绑定多个消费者

3.2、消息发送

3.3、消息接受

3.4、消费预期机制prefetch

配置一次只能取一条,处理完才能取下一条

spring: rabbitmq: host: 192.168.100.120 port: 5672 username: admin password: admin #开启消息确认模式,新版本已经弃用 #publisher-confirms: true #开启消息送达提示 publisher-returns: true # springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果 publisher-confirm-type: correlated virtual-host: / listener: simple: prefetch: 1 #限制每次发送一条数据。 4、发布和订阅模型 4.1、概念

4.2、Fanout广播类型

4.2.1、示例

4.2.2、声明绑定

4.2.3、消费者绑定

4.2.4、生产者发送消息

4.3、DriectExchage路由类型

通过routeKey可以实现Fanout广播类型

4.3.1、示例

4.3.2、申明Exchange、Queue @Configuration public class RabbitDirectConfig { @Bean public Queue directQueue(){ // 参数介绍 // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数 return new Queue("directQueue-One",true,false,false,null); } @Bean public Queue directQueue2(){ // 参数介绍 // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数 return new Queue("directQueue-Two",true,false,false,null); } @Bean public DirectExchange directExchange(){ // 参数介绍 // 1.交换器名 2.是否持久化 3.自动删除 4.其他参数 return new DirectExchange("MqSendService-One",true,false,null); } @Bean public Binding bingExchange(){ // 绑定队列 return BindingBuilder.bind(directQueue2()) // 队列绑定到哪个交换器 .to(directExchange()) // 绑定路由key,必须指定 .with("One"); } @Bean public Binding bingExchange2(){ // 绑定队列 return BindingBuilder.bind(directQueue2()) // 队列绑定到哪个交换器 .to(directExchange()) // 绑定路由key,必须指定 .with("Two"); } } 4.3.3、在consumer消费者服务监听

4.3.4、总结 4.4、TopicExchange话题广播类型

4.4.1、示例

4.4.2、申明 @Configuration public class RabbitTopicConfig { @Bean public Queue queue(){ // 参数介绍 // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数 return new Queue("simple.queue",true,false,false,null); } @Bean public TopicExchange topicExchange(){ // 参数介绍 // 1.交换器名 2.是否持久化 3.自动删除 4.其他参数 return new TopicExchange("amq.topic",true,false,null); } @Bean public Binding bingExchange(){ // 绑定队列 return BindingBuilder.bind(queue()) // 队列绑定到哪个交换器 .to(topicExchange()) // 绑定路由key,必须指定 .with("simple.#"); } }

4.4.3、生产者发送

5、消息转换器

spring的消息对象处理是有org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SpringMessageConverter,基于JDK的ObjectOutputStream完成序列化。 如果需要修改只需要定义一个MessageConverter类型的bean即可,推荐用JSON方式序列化,步骤如下

5.1、引入jackson的依赖 <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency> 5.2、申明Bean @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #AMQPAdvanced #message #Queuing