irpas技术客

MQ消息中间件之RabbitMQ以及整合SpringBoot2.x实战教程_祁某某呀

irpas 8188

文章目录 一、MQ引言1.1 什么是MQ1.2 MQ有哪些1.3 不同MQ特点 二、RabbitMQ的引言2.1 RabbitMQ2.2 RabbitMQ的安装2.2.1 下载2.2.2 下载的安装包2.2.3 安装 三、RabbitMQ配置3.1 RabbitMQ管理命令行3.2 web管理界面介绍3.2.1 overview概览3.2.2 Admin用户和虚拟主机管理 四、RabbitMQ的第一个程序4.1 AMQP协议的回顾4.2 RabbitMQ支持的消息模型4.3 引入依赖4.4 第一种模型(直连)4.2 第二种模型(work quene)4.3 第三种模型(fanout)4.4 第四种模型(Routing)4.4.1 Routing 之订阅模型-Direct(直连)4.4.2 Routing 之订阅模型-Topic 五、SpringBoot中使用RabbitMQ5.1 搭建初试环节5.2 hello world模型使用5.3 work模型使用5.4 Fanout 模型使用5.5 Route模型使用5.6 Topic模型使用 六、SpringBoot中使用RabbitMQ6.1 异步处理6.2 应用解耦6.3 流量削峰

提示:以下是本篇文章正文内容,不足之处欢迎指出


一、MQ引言 1.1 什么是MQ

MQ (Message Quene):翻译为 消息队列 ,通过典型的 生产者 和 消费者 模型生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入轻松的实现系统间解耦。别名为消息中间件—通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1.2 MQ有哪些

当今市面上有很多主流的消息中间件,如老牌的 ActiveMQ、 RabbitMQ,炙手可热的 Kafka,阿里巴巴自主开发 RocketMQ 等。

1.3 不同MQ特点 ActiveMQ ActiveMQ是Apache出品,最流行的。能力强劲的开源消息总线。它是一个完全支持JMS规范的消息中间件。丰富的APT,多种集群架构模式让ActiveMQ流在业界成为老牌的消息中间件,在中小型企业颇受欢迎!Kafka Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。RocketMQ RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。RabbitMQ RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由〈包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

总结:RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性〈少量延迟),可靠性〔少量丢数据)要求稍低的场景使用,比如ELK日志收集。

二、RabbitMQ的引言 2.1 RabbitMQ

RabbitMQ,基于 AMQP 协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

2.2 RabbitMQ的安装 2.2.1 下载

官网下载地址:https://·/download.html 我们直接选择Linux环境下进行安装,官网可能出现下载网速慢的情况,这里已经准备好了安装包和相关依赖

2.2.2 下载的安装包

2.2.3 安装 将rabbitMQ安装包以及依赖上传到Linux系统中 安装Erlang依赖包 rpm -ivh erlang-22.0.7-1.e17.x86_64.rpm (若安装过程中报错,显示libcrypto.so.10(OPENSSL_1.0.2)(64bit) is needed by erlang-22.0.7-1.el7.x86_64错误,则先安装openssl-libs-1.0.2k-19.el7.x86_64.rpm依赖,在进行erlang安装) rpm -ivh openssl-libs-1.0.2k-19.el7.x86_64.rpm --force 安装RabbitMQ socket依赖和安装包 rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm rpm -ivh --nodeps rabbitmq-server-3.7.18-1.el7.noarch.rpm

复制rabbitMQ配置文件 find / -name rabbitmq.config.example cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config 修改配置文件 vim /etc/rabbitmq/rabbitmq.config

修改为如下图所示

启动rabbitmq中的插件管理 rabbitmq-plugins enable rabbitmq_management 出现如下说明: Enabling plugins on node rabbit@localhost: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@localhost... The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch set 3 plugins. Offline change; changes will take effect at broker restart. 启动RabbitMQ的服务 启动:systemctl start rabbitmq-server 重启:systemctl restart rabbitmq-server 停止:systemctl stop rabbitmq-server 查看服务状态 systemctl status rabbitmq-server 关闭防火墙服务 systemctl stop firewalld 访问web管理页面:虚拟机 IP地址+端口号(首次登录时账号密码均为guest) 三、RabbitMQ配置 3.1 RabbitMQ管理命令行 1. 服务启动相关 systemctl start|restart|stop status rabbitmq-server 2. 管理命令行 用来在不使用web管理界面情况下命令操作RabbitMQ rabbitmqctl help 可以查看更多命令 3. 插件管理命令行 rabbitmqplugins enable|list|disable 3.2 web管理界面介绍 3.2.1 overview概览

connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才 可以完成消息的生产和消费,在这里可以查看连接情况` channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。 Exchanges:交换机,用来实现消息的路由 Queues:队列,即消息队列,消息存放在队列中,等待消费, 消费后被移除队列。 3.2.2 Admin用户和虚拟主机管理 添加用户 上面的Tags选项,其实是指定用户的角色,可选的有以下几个: 超级管理员(administrator) 可登陆管理控制台,可查看所有的信息,并且可以对用户, 策略(policy)进行操作。 监控者(monitoring) 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息 (进程数,内存使用情况,磁盘使用情况等) 策略制定者(policymaker) 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的 相关信息(上图红框标识的部分)。 普通管理者(management) 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 其他 无法登陆管理控制台,通常就是普通的生产者和消费者。 创建虚拟主机 虚拟主机 为了让各个用户可以互不干扰的工作,RabbitMQ添加了 虚拟主机(Virtual Hosts)的概念。其实就是一个独立的 访问路径,不同用户使用不同路径,各自有自己的 队列、交换机,互相不会影响。相当于关系型中的数据库

虚拟主机绑定用户 四、RabbitMQ的第一个程序 4.1 AMQP协议的回顾

4.2 RabbitMQ支持的消息模型

可上官网查看具体详情

4.3 引入依赖 <!--引入rabbitmq的相关依赖--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.2</version> </dependency> 4.4 第一种模型(直连)

在上图的模型中,有以下概念: P:生产者,也就是要发送消息的程序 C:消费者:消息的接受者,会一直等待消息到来。 queue:消息队列,图中红色部分。类似一个邮箱, 可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。 开发生产者 public class Provider { // 生产消息 @Test public void testSendMessage() throws IOException, TimeoutException { // 创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置连接rabbitmq主机 connectionFactory.setHost("192.168.88.100"); // 设置端口号 connectionFactory.setPort(5672); // 设置连接哪个虚拟机 connectionFactory.setVirtualHost("/emsVirtual"); // 设置访问虚拟主机的用户名和密码 connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); // 获取连接对象 Connection connection = connectionFactory.newConnection(); // 获取连接中的通道 Channel channel = connection.createChannel(); // 通道绑定对应消息队列 // 参数1:队列名称 如果队列不存在则自动创建 // 参数2:用来定义队列特性是否需要持久化 true 持久化队列 false 不持久化 // 参数3:exclusive:是否独占队列 true 独占队列 false 不独占 // 参数4:autoDelete:是否在消费完成后自动删除队列 true 自动删除 false 不自动删除 // 参数5:额外附加参数 channel.queueDeclare("hello", false, false, false, null); // 发布消息 // 参数1:交换机名称 参数2:队列名称 参数3:传递消息额外设置 参数4:消息的具体内容 channel.basicPublish("","hello",null,"hello rabbitmq".getBytes()); // 关闭通道 channel.close(); // 关闭连接 connection.close(); } }

开发消费者 // 消费者 public class Customer { public static void main(String[] args) throws IOException, TimeoutException { // 创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置连接rabbitmq主机 connectionFactory.setHost("192.168.88.100"); // 设置端口号 connectionFactory.setPort(5672); // 设置连接哪个虚拟机 connectionFactory.setVirtualHost("/emsVirtual"); // 设置访问虚拟主机的用户名和密码 connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); // 获取连接对象 Connection connection = connectionFactory.newConnection(); // 获取连接中的通道 Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); // 消费消息 // 参数1:消费哪个队列的消息 队列名称 // 参数2:开始消息的自动确认机制 // 参数3:消费时的回调接口 channel.basicConsume("hello", true, new DefaultConsumer(channel) { // 最后一个参数:消息队列中取出的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }

封装工具类 public class RabbitMQUtils { private static ConnectionFactory connectionFactory; // 静态代码块,在类加载时执行,只执行一次 static { connectionFactory = new ConnectionFactory(); // 设置连接rabbitmq主机 connectionFactory.setHost("192.168.88.100"); // 设置端口号 connectionFactory.setPort(5672); // 设置连接哪个虚拟机 connectionFactory.setVirtualHost("/emsVirtual"); // 设置访问虚拟主机的用户名和密码 connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); } // 定义提供连接对象的方法 public static Connection getConnection() { try { // 获取连接对象 return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; } // 关闭通道和连接 public static void closeConnectionAndChanel(Channel channel, Connection connection) { try { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } 4.2 第二种模型(work quene)

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消耗速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

角色: P:生产者:任务的发布者 C1:消费者1,领取任务并且完成任务,假设完成速度较慢 C2:消费者2,领取任务并且完成任务,假设完成速度快 开发生产者 public class Provider { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = RabbitMQUtils.getConnection(); // 获取通道对象 Channel channel = connection.createChannel(); // 通过通道声明队列 channel.queueDeclare("work", true, false, false, null); for (int i = 0; i < 10; i++) { // 生产消息 channel.basicPublish("", "work", null, (i + "hello work quene").getBytes()); } // 关闭资源 RabbitMQUtils.closeConnectionAndChanel(channel, connection); } } 开发消费者1 public class Customer1 { public static void main(String[] args) throws IOException { // 获取连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1-" + new String(body)); } }); } } 开发消费者2 public class Customer2 { public static void main(String[] args) throws IOException { // 获取连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2-" + new String(body)); } }); } }

测试结果 总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。

消息确认机制: 完成一项任务可能需要几秒钟。您可能想知道,如果其中一个消费者开始了一项长期任务,但只完成了一部分就死了,会发生什么情况。在我们当前的代码中,一旦RabbitMQ将消息传递给使用者,它就会立即将其标记为删除。在这种情况下,如果您杀死一个worker,我们将丢失它刚刚处理的消息。我们还将丢失发送给该特定工作进程但尚未处理的所有消息。但我们不想失去任何任务。如果一个worker死了,我们希望把任务交给另一个工人。

// 开发消费者1 public class Customer1 { public static void main(String[] args) throws IOException { // 获取连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 保证消息队列中每次只能处理一个 channel.basicQos(1); channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } System.out.println("消费者1-" + new String(body)); // 参数1:确认队列中那个具体消息 参数2:是否开启多个消息同时确实 channel.basicAck(envelope.getDeliveryTag(),false); } }); } } // 开发消费者2 public class Customer2 { public static void main(String[] args) throws IOException { // 获取连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 保证消息队列中每次只能处理一个 channel.basicQos(1); channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2-" + new String(body)); //手动确认 参数1:手动确认消息标识 参数2:false 每次确认一个 channel.basicAck(envelope.getDeliveryTag(), false); } }); } } 设置通道一次只能消费一个消息 关闭消息的自动确认,开启手动确认消息

4.3 第三种模型(fanout)

fanout 扇出 也称为广播

在广播模式下,消息发送流程是这样的: 1.可以有多个消费者 2.每个消费者有自己的queue(队列) 3.每个队列都要绑定到Exchange(交换机) 4.生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定 5.交换机把消息发送给绑定过的所有队列 6.队列的消费者都能拿到消息。实现一条消息被多个消费者消费 开发生产者 public class Provider { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = RabbitMQUtils.getConnection(); // 获取通道对象 Channel channel = connection.createChannel(); // 将通道声明指定交换机 // 参数1:交换机名称 参数2:交换机类型 fanout 广播类型 channel.exchangeDeclare("logs", "fanout"); // 发送消息 channel.basicPublish("logs", "", null, "fanout type message".getBytes()); // 关闭资源 RabbitMQUtils.closeConnectionAndChanel(channel, connection); } } 开发消费者1 public class Customer1 { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 通道绑定交换机 channel.exchangeDeclare("logs", "fanout"); // 临时队列 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queueName, "logs", ""); // 消费消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); } }); } } 开发消费者2 public class Customer2 { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 通道绑定交换机 channel.exchangeDeclare("logs", "fanout"); // 临时队列 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queueName, "logs", ""); // 消费消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:" + new String(body)); } }); } } 测试结果 4.4 第四种模型(Routing) 4.4.1 Routing 之订阅模型-Direct(直连) 在Fanout模式中。一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。 这时就要用到 Direct 类型的 Exchange 在Direct模型下: 队列与交换机的绑定,不能在是任意绑定了,而是要指定一个 RoutingKey (路由key); 消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey; Exchange不再把消息交给每一个绑定的队列,而是根据消息的 RoutingKey 进行判断,只有队列的 RoutingKey 与消息的RoutingKey也完全一致,才会接收到消息。

图解: P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。 X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列 C1:消费者,其所在队列指定了需要routing key 为 error 的消息 C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息 开发生产者 public class Provider { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = RabbitMQUtils.getConnection(); // 获取通道对象 Channel channel = connection.createChannel(); // 通过通道声明交换机 参数1:交换机名称 参数2:direct 路由模式 channel.exchangeDeclare("logs_direct", "direct"); // 发送消息 String routingkey = "info"; channel.basicPublish("logs_direct", routingkey, null, ("这是direct模型发布的基于route key:[" + routingkey + "]发送的消息").getBytes()); // 关闭资源 RabbitMQUtils.closeConnectionAndChanel(channel, connection); } } 开发消费者1 public class Customer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 声明交换机名称 String exchangeName = "logs_direct"; // 通道声明交换机以及交换的类型 channel.exchangeDeclare("logs_direct", "direct"); // 创建一个临时队列 String queue = channel.queueDeclare().getQueue(); // 基于route key绑定队列和交换机 channel.queueBind(queue, exchangeName, "error"); // 获取消费的消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); } }); } } 开发消费者2 public class Customer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 声明交换机名称 String exchangeName = "logs_direct"; // 通道声明交换机以及交换的类型 channel.exchangeDeclare("logs_direct", "direct"); // 创建一个临时队列 String queue = channel.queueDeclare().getQueue(); // 基于route key绑定队列和交换机 channel.queueBind(queue, exchangeName, "info"); channel.queueBind(queue, exchangeName, "error"); channel.queueBind(queue, exchangeName, "warning"); // 获取消费的消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); } }); } }

测试生产者发送Route key为error的消息时

测试生产者发送Route key为info的消息时

4.4.2 Routing 之订阅模型-Topic

Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 RoutingKey 的时候使用通配符!这种模型 RoutingKey 一般都是由一个或多个单词组成,多个单词之间以“.”分割,例如:‘item.insert’

# 统配符 * (star) can substitute for exactly one word. 匹配不多不少恰好1个词 # (hash) can substitute for zero or more words. 匹配零个、一个或多个词 # 如: audit.# 匹配audit、audit.irs 、或者audit.irs.corporate等 audit.* 只能匹配 audit.irs 开发生产者 public class Provider { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = RabbitMQUtils.getConnection(); // 获取通道对象 Channel channel = connection.createChannel(); // 通过通道声明交换机以及交换机类型 参数1:交换机名称 参数2:topic channel.exchangeDeclare("topics", "topic"); // 发送消息 String routingkey = "user.save"; channel.basicPublish("topics", routingkey, null, ("这是topic模型发布的基于route key:[" + routingkey + "]发送的消息").getBytes()); // 关闭资源 RabbitMQUtils.closeConnectionAndChanel(channel, connection); } } 开发消费者1 (Routing Key中使用*通配符方式) // Routing Key中使用*通配符方式 public class Customer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 通道声明交换机以及交换的类型 channel.exchangeDeclare("topics", "topic"); // 创建一个临时队列 String queue = channel.queueDeclare().getQueue(); // 基于route key绑定队列和交换机 动态通配符形式 channel.queueBind(queue, "topics", "user.*"); // 获取消费的消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:" + new String(body)); } }); } } 开发消费者2 (Routing Key中使用#通配符方式) // Routing Key中使用#通配符方式 public class Customer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 通道声明交换机以及交换的类型 channel.exchangeDeclare("topics", "topic"); // 创建一个临时队列 String queue = channel.queueDeclare().getQueue(); // 基于route key绑定队列和交换机 动态通配符形式 channel.queueBind(queue, "topics", "user.#"); // 获取消费的消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:" + new String(body)); } }); } } 测试结果 总结:Routing Key中使用 user.# 通配符方式时,代表包含user(如:user.save.delete,save.user)的都可以匹配,且长度不限;Routing Key中使用 user.* 通配符方式时,代表长度为2(如:user.save)的routingkey,且第一位只能为user。 五、SpringBoot中使用RabbitMQ 5.1 搭建初试环节 引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 配置配置文件 spring: application: name: springboot_rabbitmq rabbitmq: host: 192.168.88.100 port: 5672 username: ems password: 123456 virtual-host: /emsVirtual 5.2 hello world模型使用 开发生产者 @SpringBootTest(classes = RabbitMqDemo01Application.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { // 注入rabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; // hello world @Test public void test01() { rabbitTemplate.convertAndSend("hello","hello world"); } // 生产端没有指定交换机只有routingKey和Object。 //消费方产生hello队列,放在默认的交换机(AMQP default)上。 //而默认的交换机有一个特点,只要你的routerKey的名字与这个 //交换机的队列有相同的名字,他就会自动路由上。 //生产端routingKey 叫hello ,消费端生产hello队列。 //他们就路由上了 } 开发消费者 @Component @RabbitListener(queuesToDeclare = @Queue("hello")) public class HelloCustomer { @RabbitHandler public void receive(String message) { System.out.println("message = " + message); } } 测试结果 5.3 work模型使用 开发生产者 @SpringBootTest(classes = RabbitMqDemo01Application.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { // 注入rabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; // work @Test public void testWork() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work", i + " work模型"); } } } 开发消费者 @Component public class WorkCustomer { // 消费者1 @RabbitListener(queuesToDeclare = @Queue("work")) public void receive1(String message) { System.out.println("messge1 = " + message); } // 消费者1 @RabbitListener(queuesToDeclare = @Queue("work")) public void receive2(String message) { System.out.println("messge2 = " + message); } } 测试结果 说明:默认在Spring AMQP实现中Work这种方式就是公平调度,如果需要实现能者多劳需要外配置 5.4 Fanout 模型使用 开发生产者 @SpringBootTest(classes = RabbitMqDemo01Application.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { // 注入rabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; // fanout 广播 @Test public void testFanout() { rabbitTemplate.convertAndSend("logs","","Fanout的模型发送的消息"); } } 开发消费者 @Component public class FanoutCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, // 创建临时队列 exchange = @Exchange(value = "logs" , type = "fanout") // 绑定的交换机 ) }) public void receive1(String message) { System.out.println("message1 = " +message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, // 创建临时队列 exchange = @Exchange(value = "logs" , type = "fanout") // 绑定的交换机 ) }) public void receive2(String message) { System.out.println("message2 = " +message); } } 测试结果 5.5 Route模型使用 开发生产者 @SpringBootTest(classes = RabbitMqDemo01Application.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { // 注入rabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; //route 路由模式 @Test public void testRoute() { rabbitTemplate.convertAndSend("directs", "info", "发送info的key的路由信息"); } } 开发消费者 @Component public class RouteCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, // 创建临时队列 exchange = @Exchange(value = "directs", type = "direct"), //自定义交换机名称和类型 key = {"info", "error", "warning"} ) }) public void receive1(String message) { System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, // 创建临时队列 exchange = @Exchange(value = "directs", type = "direct"), //自定义交换机名称和类型 key = {"info"} ) }) public void receive2(String message) { System.out.println("message2 = " + message); } }

测试结果 发送info的key的路由信息

发送error的key的路由信息

5.6 Topic模型使用 开发生产者 @SpringBootTest(classes = RabbitMqDemo01Application.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { // 注入rabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; //topic 动态路由 订阅模式 public void testTopic() { rabbitTemplate.convertAndSend("topics", "suer.save", "user.save 路由消息"); } } 开发消费者 @Component public class TopicCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(name = "topics", type = "topic"), key = {"user.save", "user.*"} ) }) public void receive1(String message) { System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(name = "topics", type = "topic"), key = {"order.#", "product.#", "user.*"} ) }) public void receive2(String message) { System.out.println("message2 = " + message); } }

测试结果 user.save 路由消息

order.save 路由消息

六、SpringBoot中使用RabbitMQ 6.1 异步处理

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种。1、串行的方式 2、并行的方式

串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一个问题是,邮件、短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。

并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式可能提高处理的时间。

消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高了处理时间,但是,邮件和短信对我们正常使用没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回,消息队列:引入消息队列后,把发送邮件、短信不是必须的业务逻辑异步处理。

6.2 应用解耦

场景:双11是购物狂阶,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。 这种做法有一个缺点:当库存系统出现故障时,订单就会失败。订单系统和库存系统高耦合,引入消息队列

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。库存系统:订阅下单的消息,获取下单消息,进行库操作。就算库存系统出现故障,消息队列也能保证消息的可靠传递,不会导致消息丢失。 6.3 流量削峰

场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。 作用:

可以控制活动人数,超过一定阈值的订单直接丢弃可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单) 用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。 秒杀业务根据消息队列中的请求信息,在做后续处理。

至此,RabbitMQ的学习以及整合SpringBoot已经完成!


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Python #什么是MQ12 #MQ有哪些13 #不同MQ特点1