irpas技术客

RabbitMQ 实战_Java 码农_rabbitmq 实战

网络投稿 1385

1:安装 RabbitMQ?

这里 我会先同时安装三台机器,为以后的高可用集群做准备

注意在进行以下操作之前可以先关闭防火墙 或者?开放防火墙端口

?开放防火墙端口 //永久的添加该端口。去掉--permanent则表示临时。 firewall-cmd --permanent --zone=public --add-port=5672/tcp firewall-cmd --permanent --zone=public --add-port=15672/tcp //重新加载配置,使得修改有效。 firewall-cmd --reload? //查看开启的端口,出现5672/15672这开启正确 firewall-cmd --permanent --zone=public --list-ports?

(推荐使用)

永久关闭防火墙

首先查看防火墙的状态 systemctl status firewalld.service

然后执行命令进行关闭 systemctl stop firewalld service? ? ? 临时关闭防火墙

systemctl disable firewalld.service ? 开机禁止防火墙服务器? 永久关闭 systemctl enable firewalld.service ?开机启动防火墙服务器

(自己学习的时候使用)

1.1:安装RabbitMQ 的依赖环境

安装常用的环境和工具包

yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget gtk2-devel binutils-devel

erlang官网下载你需要的erlang的版本:

https://·/rabbitmq/rabbitmq-server/releases/tag/v3.8.26

注意官网上的这句话

Erlang/OTP Compatibility Notes

This release?requires Erlang 23.2?and?supports Erlang 24.

Erlang 和 rabbitMQ 是有版本对应关系的,版本不对应安装不成功

上传到 linux

rpm -ivh --nodeps rabbitmq-server-3.8.26-1.el7.noarch.rpm

使用 yum 安装时 会报错 ,因为yum 比较谨慎一般都安装比较旧的版本,

?rpm -ivh --nodeps rabbitmq-server-3.9.5-1.el7.noarch.rpm? ?添加参数?--nodeps? 忽略依赖校验

再次执行 可以发现安装成功了

#启动rabbitmq,-detached代表后台守护进程方式启动

# 启动服务:rabbitmq-server -detached # 查看状态:rabbitmqctl status # 关闭服务:rabbitmqctl stop # 列出角色:rabbitmqctl list_users

查看 状态如下 说明 安装启动成功

查看用户列表

#启用管理插件 rabbitmq-plugins enable rabbitmq_management

?# 端口 15672(网页管理) 5672 (AMQP端口): #在浏览器中输入服务器IP:15672 就可以看到RabbitMQ的WEB管理页面了,但是现在并不能登录

我们需要新建一个用户

?此时还无法登录,我们需要给管理控制台添加用户 并授予权限

rabbitmqctl add_user developer? ?添加用户?

?根据提示输入密码:? dev123456

给用户添加角色 并查看用户信息

rabbitmqctl set_user_tags developer dev123456?administrator

rabbitmqctl list_users? 查看用户列表

删除原有的 guest 用户

rabbitmqctl delete_user guest

?使用 新的用户名 developer 密码? dev123456 登录 后台管理界面 可以发现三个机器的管理后台都可以 正常进入了

?

?1.3:常用的用户管理的命令及界面操作

# 启动服务:rabbitmq-server -detached # 查看状态:rabbitmqctl status # 关闭服务:rabbitmqctl stop # 列出角色:rabbitmqctl list_users

?添加用户 rabbitmqctl? add_user developer #根据提示 添加密码

?删除用户 :rabbitmqctl delete_user guest

?修改密码 :?rabbitmqctl change_password developer dev123456

?

?RabbitMQ中主要有administrator,monitoring,policymaker,management,impersonator,none几种角色

修改角色:?rabbitmqctl set_user_tags developer administrator

?也可以给用户设置多个角色,如给用户developer设置administrator,monitoring

rabbitmqctl set_user_tags developer administrator monitoring

权限包含 读 写 配置

权限赋值 rabbitmqctl set_permissions -p / developer ".*" ".*" ".*"

查看(指定vhostpath)所有用户的权限 rabbitmqctl list_permissions

?查看virtual host为/的所有用户权限: rabbitmqctl list_permissions -p /

?

?查看指定用户的权限 rabbitmqctl list_user_permissions developer

?清除用户权限 rabbitmqctl clear_permissions developer

?

1.4:RabbitMQ用户角色及权限控制

RabbitMQ的用户角色分为5类: none、management、policymaker、monitoring、administrator

none 不能访问 management plugin

management 拥有这种角色的用户?通过AMQP做的任何事外 还可以?列出自己可以通过AMQP登入的virtual hosts? 查看自己的virtual hosts中的queues, exchanges 和 bindings 查看和关闭自己的channels 和 connections 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动

policymaker? 该角色可以拥有management可以做的任何事外加权限: 查看、创建和删除自己的virtual hosts所属的policies和parameters

monitoring ? management可以做的任何事外加: 列出所有virtual hosts,包括他们不能登录的virtual hosts 查看其他用户的connections和channels 查看节点级别的数据如clustering和memory使用情况 查看真正的关于所有virtual hosts的全局的统计信息

administrator ? 最高权限 policymaker和monitoring可以做的任何事外加: 创建和删除virtual hosts 查看、创建和删除users 查看创建和删除permissions 关闭其他用户的connections

创建用户并设置角色: 可以创建管理员用户,负责整个MQ的运维,例如: $sudo rabbitmqctl add_user ?admin(用户名)? admin(密码) 赋予其administrator角色: $sudo rabbitmqctl set_user_tags admin administrator? 添加?administrator 角色

创建和赋角色完成后查看并确认: $sudo rabbitmqctl list_users

添加权限

对何种资源具有配置、写、读的权限通过正则表达式来匹配,具体命令如下: set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

其中,<conf> <write> <read>的位置分别用正则表达式来匹配特定的资源,如'^(amq\.gen.*|amq\.default)$'可以匹配server生成的和默认的exchange,'^$'不匹配任何资源 需要注意的是RabbitMQ会缓存每个connection或channel的权限验证结果、因此权限发生变化后需要重连才能生效。 为用户赋权: $sudo rabbitmqctl ?set_permissions -p /TEST? admin '.*' '.*' '.*' 该命令使用户admin具有/TEST这个virtual host中所有资源的配置、写、读权限以便管理其中的资源 ?

2: Java 操作RabbitMQ 2.1: 第一个Java RabbitMQ 程序

创建一个 maven 工程? ,添加如下依赖和配置

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://·piler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/commons-io/commons-io --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

编写 一个 消息生产者

package org.rb; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * Hello world! * */ public class MsgProducer { private static final String QUEEN_NAME="hello"; public static void main( String[] args )throws Exception { String msg = "Hello World"; //创建链接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置工厂配置信息 factory.setHost("192.168.217.128"); factory.setPassword("dev123456"); factory.setUsername("developer"); //创建链接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //创建队列 //参数说明 /*** 1:队列名称 2:是否持久化持久化会保存到磁盘,默认是保存到内存 3:是否多个消费者共享消费 4:是否自动删除 5:其他参数,延迟或者死信队列等 ***/ channel.queueDeclare(QUEEN_NAME,false,false, false,null); /** * 1: 发送到那个交换机 * 2:路由的key值 * 3: 其他参数信息 * 4:参数内容 * **/ channel.basicPublish("",QUEEN_NAME,null,msg.getBytes()); System.out.println("消息發送完畢"); } }

执行生产者代码 发送信息,并在 rabbitmq 的管理控制台检查对列中的消息是否发送成功,看到如下结果 说明信息发送成功

?创建消费者

package org.rb; import com.rabbitmq.client.*; public class MsgConsumer { private static final String QUEEN_NAME="hello"; public static void main(String[] args) throws Exception{ //创建链接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置工厂配置信息 factory.setHost("192.168.217.128"); factory.setPassword("dev123456"); factory.setUsername("developer"); //创建链接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); /** * 1:被消费队列名 * 2:是否自动应答 true 是自动应答 false 手动应答 * 3:消费者未成功消费的回调函数 * 4: 消费者去掉消费的回调 * */ //正常获取消息 DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println(new String(message.getBody())); }; //消费消息被中断 CancelCallback cancelCallback=(consumerTag)->{ System.out.println("消费消息被中断"); }; channel.basicConsume(QUEEN_NAME,true,deliverCallback,cancelCallback); System.out.println("消息接受完毕"); } }

?可以看见消息被消费了,消息个数变成0 了

2.2: work queen 模式

一个生产者发送消息

可以有多个消费者 ,但是只有一个消费者可以获取到消息,使用轮询方式来处理消息,消息不可以重复被消费

编写 work01 工作线程,并在idea 中 设置使这个class 可以多个线程执行

?通过修改这里来表示 多个线程收到信息

DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println("C02接受到的消息:"+ new String(message.getBody())); };

package org.rb.day01; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import org.rb.util.RabbitMqUtils; //消费类 public class Worker01 { private static final String QUEEN_NAME="hello"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); /** * 1:被消费队列名 * 2:是否自动应答 true 是自动应答 false 手动应答 * 3:消费者未成功消费的回调函数 * 4: 消费者去掉消费的回调 * */ DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println("接受到的消息:"+ new String(message.getBody())); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+"消息被取消回调"); }; channel.basicConsume(QUEEN_NAME,true,deliverCallback,cancelCallback); } }

编写生产者代码

package org.rb.day01; import com.rabbitmq.client.Channel; import org.apache.commons.lang3.StringUtils; import org.rb.util.RabbitMqUtils; import java.util.Scanner; public class Producer01 { private static final String QUEEN_NAME="hello"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); /*** 1:队列名称 queue 2:是否持久化持久化会保存到磁盘,默认是保存到内存 durable 3:是否多个消费者共享消费 exclusive 4:是否自动删除 autoDelete 5:其他参数,延迟或者死信队列等 arguments ***/ channel.queueDeclare(QUEEN_NAME,false,false,false,null); //为了发消息比较明显,使用控制台发送 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",QUEEN_NAME,null,message.getBytes()); System.out.println("消息"+message+"发送完成"); } } }

执行producer代码发送多个消息,会发现 消费端 会轮流的打印 发送的消息,并且不会重复消费,是轮询的的消费消息的?

?2.3:消息应答机制

在消费者消费完之后 进行应答,只有在获取到消费端消费完的应到之后才删除队列里的消息

避免消息丢失,自动应答并不靠谱 ,特别是在接受到大量消息的时候 ,如果后续处理消息的过程中 发生了异常,可能会导致大量消息丢失

消息应答的方式:

Channel.basicAck()? ?用于肯定确认,消息已经肯定处理成功了

Channel.basicNack()? ?用于否定确认,不能确定消息已经肯定处理成功了

Channel.basicReject()??用于拒绝确认,不能确定消息已经肯定处理成功了 比这个?Channel.basicNack() 对一个是否批量处理的参数mutilple

手动应答的好处 可以批量应答,你比那个且可以减少网络阻塞,如下图所示当批量应答时 只要channel 中的第一个被应答 ,信道中的其他消息就会一并被应答

而不批量应答只能一个一个的应答

?消息自动重新入队,开启手动应答

当有多个消费端时 ,如果一个消息已经发送,并且被一个消费端获取了,但是队列并未收到ack ,这时队列并不会删除消息,而是将消息从新入队并将消息发送个另外一个可达的 消费端消费

?消息手动应答的代码

package org.rb.day01; import com.rabbitmq.client.Channel; import org.rb.util.RabbitMqUtils; import java.util.Scanner; public class NotAutoAckProducer { private static final String NOTAUTOACK_QUEEN_NAME="ack_queen"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(NOTAUTOACK_QUEEN_NAME,false,false,false,null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",NOTAUTOACK_QUEEN_NAME,null,message.getBytes("UTF-8")); System.out.println("消息"+message+"发送完成"); } } } package org.rb.day01; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import org.rb.util.RabbitMqUtils; //手动应答,消息不丢失,返回队列个可达的消费者消费 public class NotAutoAckConsumer { private static final String NOTAUTOACK_QUEEN_NAME="ack_queen"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message)->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } /** * 手动应答 获取消息标签 * 和不批量应答 * **/ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); System.out.println("NotAutoAckConsumer1接受到的消息:"+ new String(message.getBody())); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+"消息被取消回调"); }; boolean autoAck = false; channel.basicConsume(NOTAUTOACK_QUEEN_NAME,autoAck,deliverCallback,cancelCallback); } } package org.rb.day01; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import org.rb.util.RabbitMqUtils; //手动应答,消息不丢失,返回队列个可达的消费者消费 public class NotAutoAckConsumer2 { private static final String NOTAUTOACK_QUEEN_NAME="ack_queen"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message)->{ try { Thread.sleep(1000*20); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(),false); System.out.println("NotAutoAckConsumer2接受到的消息:"+ new String(message.getBody())); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+"消息被取消回调"); }; boolean autoAck = false; channel.basicConsume(NOTAUTOACK_QUEEN_NAME,autoAck,deliverCallback,cancelCallback); } }

队列持久化:

在消息发送的时候,将durable 设置为true 表示 开启持久化?

?channel.queueDeclare(NOTAUTOACK_QUEEN_NAME,true,false,false,null);

package org.rb.day01; import com.rabbitmq.client.Channel; import org.rb.util.RabbitMqUtils; import java.util.Scanner; //持久化队列 public class PersistProducer { private static final String NOTAUTOACK_QUEEN_NAME="persist_queen"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); // durable true 表示 开启持久化 channel.queueDeclare(NOTAUTOACK_QUEEN_NAME,true,false,false,null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",NOTAUTOACK_QUEEN_NAME,null,message.getBytes("UTF-8")); System.out.println("消息"+message+"发送完成"); } } }

执行代码 去控制台 可以发现 queen 里面的 features 里面的只有个大写的D ,表示持久化了,此时关闭rabbbitmq 服务再重启,发现队列不会消失了

消息持久化:

只需要修改生产者在发送消息的时候 将第三个参数修改为

//开启消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN channel.basicPublish("",NOTAUTOACK_QUEEN_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));

这里并不能保证绝对的消息不丢失 ,可能会在发送的某个时间点还没完全 处理完 但是 对服务挂了,也不能持久化

package org.rb.day01; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import org.rb.util.RabbitMqUtils; import java.util.Scanner; //持久化队列 public class PersistProducer2 { private static final String NOTAUTOACK_QUEEN_NAME="persist_queen"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); // durable true 表示 开启持队列久化 channel.queueDeclare(NOTAUTOACK_QUEEN_NAME,true,false,false,null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); //开启消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN channel.basicPublish("",NOTAUTOACK_QUEEN_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8")); System.out.println("消息"+message+"发送完成"); } } }

此时执行代码,关闭服务再次进入 控制台检查 发现 消息不会丢失了

不公平分发

在多个消费端的情况下 ,添加如下代码

channel.basicQos(1); ? // 设置不公平分发原则,体现一种能者多劳的方式 哪个线程处理的快,就会多处理消息而处理的慢的就会少处理

预取值

在上面的不公平分发的时候 传递给channel.basicQos(1)这个的参数为1 ,多个客户端里面的都是1 ,当 有一种情况我们需要发的数据的消息总数是已知的 ,这时候我们就可以通过改变这个参数来指定不同的客户端 消费 的消息数量, 比如 总共1000条消息,就可以指定 某个消费端 消费 200 条channel.basicQos(200) ,其他消费端消费 800 条 channel.basicQos(1), 这样就不管客户端 消费能力的问题,哪怕 消费端1 的消费能力很强 但是 它也只消费 200条消息, 消费端2 消费能力很差 但是 它也得消费800 条? ,这就是没有了能者多劳特性了

2.4:发布确认

在之前的讲解中,当生产者 发送消息给队列之后

虽然已经通过 durable 将队列持久化 和?MessageProperties.PERSISTENT_TEXT_PLAIN 来持久化 消息 ,但是?生产者是不知道 消息是否真的持久化了的,这就需要 RabbitMQ 的应答机制来处理这个场景

RabbitMQ 的应答机制主要有三种

单个应答 : 一个一个应答 ,准确性最高 消息不回丢失 但是效率比较低,同步操作

批量应答 : 效率比较高 ,但是 当消费端 批量消费的过程中 ,如果还没处理完就出现异常了,那么被取出的数据中 还未处理完的那部分消息就会丢失,同步操作

异步批量应答 :采用多线程的方式 使用监听器,在发送消息的同时也在监控没有发送成功的数据

package org.rb.day02; //发布确认 //1: 单个确认 //2:批量确认 //3:异步确认 import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmCallback; import org.rb.util.RabbitMqUtils; import java.util.UUID; public class ConfirmProducer { public static int MESSAGE_COUNT = 800; public static void main(String[] args) throws Exception{ //ConfirmProducer.publishMsgSingle(); //发布800个消息耗时1575ms //ConfirmProducer.publishMsgBatch(); //发布800个消息耗时242ms ,当出现消息未确认时无法知道那个消息没有被确认 ConfirmProducer.publishMsgSynchBatch(); //发布800个消息耗时197ms 异步的时间 } //1: 单个确认 public static void publishMsgSingle() throws Exception{ Channel channel = RabbitMqUtils.getChannel(); String queenName = UUID.randomUUID().toString(); channel.queueDeclare(queenName,true,false,false,null); channel.confirmSelect(); // 开启发布确认 long start = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = "msg"+i; channel.basicPublish("",queenName,null,message.getBytes()); boolean flag = channel.waitForConfirms(); //等候确认 if(flag){ System.out.println("第"+i+"个消息发送成功"); } } long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"个消息耗时"+(end-start)+"ms"); } //2:批量确认 public static void publishMsgBatch() throws Exception{ Channel channel = RabbitMqUtils.getChannel(); String queenName = UUID.randomUUID().toString(); channel.queueDeclare(queenName,true,false,false,null); channel.confirmSelect(); // 开启发布确认 //批量确认的大小 假设 200条 int batchSize = 200; long start = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = "msg"+i; channel.basicPublish("",queenName,null,message.getBytes()); if(i%batchSize == 0 ){ channel.waitForConfirms(); } } long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"个消息耗时"+(end-start)+"ms"); } //3:异步确认 public static void publishMsgSynchBatch() throws Exception{ Channel channel = RabbitMqUtils.getChannel(); String queenName = UUID.randomUUID().toString(); channel.queueDeclare(queenName,true,false,false,null); channel.confirmSelect(); // 开启发布确认 long start = System.currentTimeMillis(); //消息成功发送回调函数 deliveryTag 消息标记 multiple 是否批量确认 ConfirmCallback ackCallback = (deliveryTag, multiple)->{ System.out.println("正确确认的消息:"+deliveryTag); }; //消息失败回调函数 deliveryTag 消息标记 multiple 是否批量确认 ConfirmCallback nackCallback = (deliveryTag, multiple)->{ //这里获取到未处理成功的消息 System.out.println("未确认的消息:"+deliveryTag); }; //创建监听器在发送消息之前,监听消息的成功和失败 channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = "msg"+i; channel.basicPublish("",queenName,null,message.getBytes()); } long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"个消息耗时"+(end-start)+"ms"); } }

采用异步方式的时候 怎样去处理未被及时确认的信息呢 ?

最常用的一种方式是将为被确认的消息放到一个基于内存的可以被发布线程访问的队列

比如 ConcurrentLinkedQueen ,这个队列可以在 confirm callbacks 与发布线程之间进行消息的传递

异步确认处理未被确认消息的处理逻辑

public static void publishMsgSynchBatch() throws Exception{ Channel channel = RabbitMqUtils.getChannel(); String queenName = UUID.randomUUID().toString(); channel.queueDeclare(queenName,true,false,false,null); channel.confirmSelect(); // 开启发布确认 //方便批量删除 ,通过序号 //支持多线程并发 ConcurrentSkipListMap<Long,String> confirmMap = new ConcurrentSkipListMap<>(); long start = System.currentTimeMillis(); //消息成功发送回调函数 deliveryTag 消息标记 multiple 是否批量确认 ConfirmCallback ackCallback = (deliveryTag, multiple)->{ //步骤002 删除掉已经被消费的消息 if(multiple){ //可能会造成消息丢失 一般不用 ConcurrentNavigableMap<Long,String> confirmedMap = confirmMap.headMap(deliveryTag); confirmedMap.clear(); }else{ //推荐使用这种方式 confirmMap.remove(deliveryTag); } System.out.println("正确确认的消息:"+deliveryTag); }; //消息失败回调函数 deliveryTag 消息标记 multiple 是否批量确认 ConfirmCallback nackCallback = (deliveryTag, multiple)->{ //这里获取到未处理成功的消息 //步骤003 处理 步骤002 操作完成之后违未被确认的消息 String unConfirmMessage = confirmMap.get(deliveryTag); System.out.println("未确认的消息:"+deliveryTag+"内容是:"+unConfirmMessage); }; //创建监听器在发送消息之前,监听消息的成功和失败 channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = "msg"+i; channel.basicPublish("",queenName,null,message.getBytes()); //步骤001 记录下所有的记录 confirmMap.put(channel.getNextPublishSeqNo(),message); } long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"个消息耗时"+(end-start)+"ms"); } 2.5:交换机?

当需要消息被多个消费者消费的时候 ,就需要交换机,即 发布订阅模式?

交换机分为: 直接类型 direct? 主题类型 topic? 标题类型 header? 广播类型 fanout

队列和交换机之间需要使用绑定 来产生联系?

?

?以 fanout 模式为例演示 交换机 和 队列之间的绑定 关系 ,以及生产者发送消息 ,多个消费者获取消息消费

分别编写 消息生产者和两个消费者

package org.rb.day02; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import org.rb.util.RabbitMqUtils; import java.util.Scanner; public class ExProducer { private static final String EXCHANGE_NAME="EX"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME,"", null,message.getBytes("UTF-8")); System.out.println("消息"+message+"发送完成"); } } } package org.rb.day02; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import org.rb.util.RabbitMqUtils; public class ExConsumer { private static final String EXCHANGE_NAME="EX"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //声明一个临时队列,队列名称是随机的,当消费者断开连接自动删除 String queenName = channel.queueDeclare().getQueue(); //绑定交换机和队列 channel.queueBind(queenName,EXCHANGE_NAME,""); //接受消息 DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println("ExConsumer01接受到消息"+new String(message.getBody())); }; CancelCallback cancelCallback = (consumerTag)->{ //未处理 }; channel.basicConsume(queenName,true,deliverCallback,cancelCallback); } } package org.rb.day02; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import org.rb.util.RabbitMqUtils; public class ExConsumer02 { private static final String EXCHANGE_NAME="EX"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //声明一个临时队列,队列名称是随机的,当消费者断开连接自动删除 String queenName = channel.queueDeclare().getQueue(); //绑定交换机和队列 channel.queueBind(queenName,EXCHANGE_NAME,""); //接受消息 DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println("ExConsumer02接受到消息"+new String(message.getBody())); }; CancelCallback cancelCallback = (consumerTag)->{ //未处理 }; channel.basicConsume(queenName,true,deliverCallback,cancelCallback); } }

启动 生产者和消费者查看 管理控制台中的绑定关系

说明交换机 队列声明并绑定成功

在生产端执行 发送消息, 并在 两个消费端的控制台查看打印信息 ,发现两个消费端都消费了 消息

直接交换机:direct

如果RoutingKey 在生产者 绑定交换机得时候使用的是一样的,那么其实 和上面的 fanout 是一样的,但是 要发送的 时候?RoutingKey 不一样 ,那么就会发送消息到不同的 队列去

可以看到 当我们改变生产者中的 RoutingKey 的时候 ,不同Routingkey 对应的consumer 会接受到对应的消息

package org.rb.day02; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import org.rb.util.RabbitMqUtils; import java.util.Scanner; //演示直接交换机生产者 public class DirectExProducer { private static final String DIRECT_EXCHANGE_NAME="DIRECT_EX"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); //根据不同的RoutingKey 将消息发送的不同的队列 channel.basicPublish(DIRECT_EXCHANGE_NAME,"003", null,message.getBytes("UTF-8")); //channel.basicPublish(DIRECT_EXCHANGE_NAME,"002", null,message.getBytes("UTF-8")); //channel.basicPublish(DIRECT_EXCHANGE_NAME,"003", null,message.getBytes("UTF-8")); System.out.println("消息"+message+"发送完成"); } } }

package org.rb.day02; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import org.rb.util.RabbitMqUtils; //演示直接交换机消费者 public class DirectExConsumer { private static final String DIRECT_EXCHANGE_NAME="DIRECT_EX"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //声明交换机 channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明一个临时队列,队列名称是随机的,当消费者断开连接自动删除 String queenName = channel.queueDeclare().getQueue(); //绑定交换机和队列 可以多重绑定 channel.queueBind(queenName,DIRECT_EXCHANGE_NAME,"001"); channel.queueBind(queenName,DIRECT_EXCHANGE_NAME,"002"); //接受消息 DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println("DirectExConsumer01接受到消息"+new String(message.getBody())); }; CancelCallback cancelCallback = (consumerTag)->{ //未处理 }; channel.basicConsume(queenName,true,deliverCallback,cancelCallback); } } package org.rb.day02; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import org.rb.util.RabbitMqUtils; //演示直接交换机消费者 public class DirectExConsumer2 { private static final String DIRECT_EXCHANGE_NAME="DIRECT_EX"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //声明交换机 channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明一个临时队列,队列名称是随机的,当消费者断开连接自动删除 String queenName = channel.queueDeclare().getQueue(); //绑定交换机和队列 可以多重绑定 channel.queueBind(queenName,DIRECT_EXCHANGE_NAME,"003"); //接受消息 DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println("DirectExConsumer2接受到消息"+new String(message.getBody())); }; CancelCallback cancelCallback = (consumerTag)->{ //未处理 }; channel.basicConsume(queenName,true,deliverCallback,cancelCallback); } }

?

?

?

?Topic 主题交换机:

Topic: 所有符合routingKey表达式所绑定的队列可以接收消息

发送到topic类型交换机的消息的routing_key不能随便设置 它必须是多个用点(.)分割的单词组成,单词可以是任意的 但它们通常指定连接到该消息的某些功能 路由关键字可包含任意多的单词但最高限制是255字节。

绑定的关键字必须有相同的格式。topic交换机和direct交换的逻辑是相似的–拥有特定的路由关键字的消息将被发送到所有匹配关键字的队列,还可以有两种特殊的正则

(1)* (星号) 可以代替一个完整的单词. (2)# (井号) 可以代替零个或多个单词.

如: *.aaa.*? 表示 要发到有三个单词 但中间一个单词必须是 aaa 的

? ? ??#.aaa.*? 表示 要发到有多个单词, 但倒数第二个单词必须是 aaa 的

? ? ??#.aaa.#? 表示 要发到有多个单词, 单词包含aaa 的

? ? ? *.aaa.#? 表示 要发到有多个单词, 第二个单词是aaa 的? 等

代码演示:

package org.rb.day02; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import org.rb.util.RabbitMqUtils; import java.util.Scanner; public class TopicProducer { private static final String TOPIC_EXCHANGE_NAME="TOPIC_EX"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish(TOPIC_EXCHANGE_NAME,"*.aaa.*", null,message.getBytes("UTF-8")); // channel.basicPublish(TOPIC_EXCHANGE_NAME,"#.aaa.*", null,message.getBytes("UTF-8")); // channel.basicPublish(TOPIC_EXCHANGE_NAME,"*.aaa.*", null,message.getBytes("UTF-8")); // channel.basicPublish(TOPIC_EXCHANGE_NAME,"*.aaa.#", null,message.getBytes("UTF-8")); // channel.basicPublish(TOPIC_EXCHANGE_NAME,"#.aaa.#", null,message.getBytes("UTF-8")); System.out.println("消息"+message+"发送完成"); } } } package org.rb.day02; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import org.rb.util.RabbitMqUtils; public class TopicConsumer { private static final String TOPIC_EXCHANGE_NAME="TOPIC_EX"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); //声明交换机 channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明一个临时队列,队列名称是随机的,当消费者断开连接自动删除 String queenName = channel.queueDeclare().getQueue(); //绑定交换机和队列 可以多重绑定 channel.queueBind(queenName,TOPIC_EXCHANGE_NAME,"·piler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!-- 必须的依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/commons-io/commons-io --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit-test --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <version>2.3.10</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> <scope>provided</scope> </dependency> <!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> </dependency> <!--swagger图形化接口 開始--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!--swagger图形化接口結束--> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

配置文件 application.properties

# 应用名 spring.application.name=springboot-rabbitmq # rabbitmq配置信息 # ip spring.rabbitmq.host=192.168.217.128 # 端口 spring.rabbitmq.port=5672 # 用户名 spring.rabbitmq.username=developer # 密码 spring.rabbitmq.password=dev123456 # 配置虚拟机 spring.rabbitmq.virtual-host=/ # 消息开启手动确认 spring.rabbitmq.listener.direct.acknowledge-mode=manual

编写生产者

package org.rmq.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import java.util.Date; @Slf4j @RestController @RequestMapping("/ttl") public class TestController { //普通交换机 private static final String X_EXCHANGE = "X"; //普通RoutingKey private static final String NOMAL_ROUTINGKEY_A = "XA"; private static final String NOMAL_ROUTINGKEY_B = "XB"; @Autowired private RabbitTemplate rabbitTemplate; @ResponseBody @GetMapping("/sendmsg/{message}") public void sendMsg(@PathVariable("message") String message){ log.info("当前时间:{},发送消息{}给两个ttl队列",new Date().toString(),message); rabbitTemplate.convertAndSend(X_EXCHANGE,NOMAL_ROUTINGKEY_A,"消息来自ttl为10秒的队列:"+message); rabbitTemplate.convertAndSend(X_EXCHANGE,NOMAL_ROUTINGKEY_B,"消息来自ttl为20秒的队列:"+message); } }

编写 交换机 队列 routingKey 的配置以及绑定配置类

package org.rmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class TTLQueueConfig { //普通交换机 private static final String X_EXCHANGE = "X"; //死信交换机 private static final String DEAD_LETTER_EXCHANGE = "Y"; //普通队列 private static final String QUEUE_A = "QA"; private static final String QUEUE_B = "QB"; //死信队列 private static final String DEAD_LETTER_QUEUE = "QD"; //普通RoutingKey private static final String NOMAL_ROUTINGKEY_A = "XA"; private static final String NOMAL_ROUTINGKEY_B = "XB"; //死信RoutingKey private static final String DEAD_ROUTINGKEY= "YD"; //声明普通交换机 @Bean("xExchange") public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); } //声明死信交换机 @Bean("yExchange") public DirectExchange yExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } //声明两个普通队列 @Bean("queueA") public Queue queueA(){ //準備給正常隊列的參數 Map<String,Object> arguments = new HashMap<>(3); //设置不能正常消费时的私信交换机的参数 arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE); //设置死信交换机的RoutingKey arguments.put("x-dead-letter-routing-key",DEAD_ROUTINGKEY); arguments.put("x-message-ttl",10000); //设置过期时间 单位为ms (毫秒) return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } @Bean("queueB") public Queue queueB(){ //準備給正常隊列的參數 Map<String,Object> arguments = new HashMap<>(3); //设置不能正常消费时的私信交换机的参数 arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE); //设置死信交换机的RoutingKey arguments.put("x-dead-letter-routing-key",DEAD_ROUTINGKEY); arguments.put("x-message-ttl",20000); //设置过期时间 单位为ms (毫秒) return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } //声明死新队列 @Bean("deadQueueD") public Queue deadQueueD(){ return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } //绑定普通交换机 @Bean public Binding queueAbindingToxExchange(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with(NOMAL_ROUTINGKEY_A); } @Bean public Binding queueBbindingToxExchange(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueB).to(xExchange).with(NOMAL_ROUTINGKEY_B); } //绑定死信交换机 @Bean public Binding deadQueueDbindingToYExchange(@Qualifier("deadQueueD") Queue deadQueueD, @Qualifier("yExchange") DirectExchange yExchange){ return BindingBuilder.bind(deadQueueD).to(yExchange).with(DEAD_ROUTINGKEY); } }

编写消费者

package org.rmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; @Slf4j @Component public class TTLQueueConsumer { //死信队列 private static final String DEAD_LETTER_QUEUE = "QD"; @RabbitListener(queues = DEAD_LETTER_QUEUE) public void recieveTTLInfo(Message message, Channel channel){ String msg = new String(message.getBody()); log.info("当前时间{},收到死信队列的消息{}",new Date().toString(),msg); } }

?启动项目,

在浏览器访问

?

至此 使用 死信队列 + ttl 来处理延迟消息的例子完成,? 这里实际上就是工作中常用到的延迟队列

3.2: TTL 队列优化

在3.1 里面演示的延迟队列是存在问题的 ,当我们还需要定义更多的延迟时间的队列的时候 ,就还需要增加不同的 普通队列 和 RoutingKey? 这样就不得不修改代码,我们希望可以写一个通用的来替代

可以在之前的代码中添加一个新的 交换机和队列 ,不在消费端定义过期时间,而是在生产者端去指指定时间 ,这样就可以可解决问题

在3.1 的TTLQueueConfig代码中添加 变量

?private static final String QUEUE_C = "QC";??

private static final String NOMAL_ROUTINGKEY_C = "XC";

再新增一个队列的定义,并绑定死信队列,不指定过期时间参数

@Bean("queueC") public Queue queueC(){ //準備給正常隊列的參數 Map<String,Object> arguments = new HashMap<>(3); //设置不能正常消费时的私信交换机的参数 arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE); //设置死信交换机的RoutingKey arguments.put("x-dead-letter-routing-key",DEAD_ROUTINGKEY); //arguments.put("x-message-ttl",20000); //设置过期时间 单位为ms (毫秒) return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); } @Bean public Binding queueCBindingToxExchange(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueC).to(xExchange).with(NOMAL_ROUTINGKEY_C); }

这样就加好了配置类中的内容

在生产端 添加一个新的测试代码

@ResponseBody @GetMapping("/sendmsgttl/{message}/{ttltime}") public void sendMsg(@PathVariable("message") String message,@PathVariable("ttltime") String ttltime){ log.info("当前时间:{},发送消息{}给一个ttl队列,过期时间是{}ms",new Date().toString(),message,ttltime); rabbitTemplate.convertAndSend(X_EXCHANGE,NOMAL_ROUTINGKEY_C,message,msg->{ msg.getMessageProperties().setExpiration(ttltime); return msg; }); }

执行结果

?发现当有多条消极被发送时, 发送时间 基于了第一条的时间来执行了,哪怕第二条消息时间比第一条端,还是按第一条的时间来处理了,所以这里还是有问题的,我们还需要更多的优化

3.3 TTL + 死信队列优化2

解决上面的问题 我们需要安装一个插件

官网地址 :Community Plugins — RabbitMQ? ? ?点击进去之后会发现,已经被托管到git 上了

进入? git??https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

下载对应的版本 注意 Erlang 支持的版本 ,由于我的 符合这个版本要求,就直接下载啦

?

?下载之后 将 该文件 放到 rabbitmq 的 plugins 目录下

我的机器目录? /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.26

?

?执行?rabbitmq-plugins enable rabbitmq_delayed_message_exchange? 安装插件

?安装玩插件后需要重启服务

rabbitmqctl ?rabbitmq-server stop ?关闭服务 rabbitmq-server -detached ? 启动服务 rabbitmqctl status ? ?查看服务状态

?此时 我我们延迟交换机的 延迟队列 就变更到交换机了

编写 一个新的 延迟对类的配置类

package org.rmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DelayedQueueConfig { private static final String DELAYED_EXCHANGE = "DELAYED.EXCHANGE"; private static final String DELAYED_QUEUE = "DELAYED.QUEUE"; private static final String DELAYED_ROUTING_KEY = "DELAYED.ROUTINGKEY"; private static final String DELAYED_EXCHANGE_TYPE = "x-delayed-message"; @Bean("delayedExchange") public CustomExchange delayedExchange(){ /** * name 交换机名称 * type 交换机类型 * durable 是否持久化交换机 * autoDelete 是否自动删除交换机 * arguments 其他参数 */ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type","direct"); //延迟类型 //x-delayed-message 延迟消息 return new CustomExchange(DELAYED_EXCHANGE,DELAYED_EXCHANGE_TYPE,true,false,arguments); } @Bean("delayedQueue") public Queue delayedQueue(){ return new Queue(DELAYED_QUEUE); } @Bean public Binding delayedQueueBindingTodelayedExchange( @Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange){ return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }

编写生产者 延时时间多少 由生产多发消息的时候 指定

//############################################ private static final String DELAYED_EXCHANGE = "DELAYED.EXCHANGE"; private static final String DELAYED_ROUTING_KEY = "DELAYED.ROUTINGKEY"; //############################################ @ResponseBody @GetMapping("/sendDelayedmsg/{message}/{delaredTime}") public void sendMsg(@PathVariable("message") String message,@PathVariable("delaredTime") Integer delaredTime){ log.info("当前时间:{},发送消息{}给一个延迟队列,时间是{}ms",new Date().toString(),message,delaredTime); rabbitTemplate.convertAndSend(DELAYED_EXCHANGE,DELAYED_ROUTING_KEY,message,msg->{ msg.getMessageProperties().setDelay(delaredTime); return msg; }); }

编写消费者:

package org.rmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; @Component @Slf4j public class DelayedQueueConsumer { private static final String DELAYED_QUEUE = "DELAYED.QUEUE"; @RabbitListener(queues = DELAYED_QUEUE) public void recieveDelayedInfo(Message message){ String msg = new String(message.getBody()); log.info("当前时间{},收到延迟队列的消息{}",new Date().toString(),msg); } } 执行结果发现延迟的消息按照我们预想打印消费了,延迟时间长的后打印,延迟时间短的后打印,说明插件可用

4:发布确认的高级内容

在之前的内容里,都没有讨论交换机,队列由于某些原因收不到消息的情况,怎么在交换机和队列粗问题的时候 ,生产者能够知道消息发送失败了,这样就可以记录或者重发那些消息

4.1:交换机 确认回调

编写配置文件

package org.rmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { private static final String CONFIRM_EXCHANGE = "CONFIRM.EXCHANGE"; private static final String CONFIRM_QUEUE = "CONFIRM.QUEUE"; private static final String CONFIRM_ROUTING_KEY = "CONFIRM.ROUTINGKEY"; @Bean("confirmExchange") public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE).build(); } @Bean public Binding confirmBind(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } }

编写生产者

//############################################ private static final String CONFIRM_EXCHANGE = "CONFIRM.EXCHANGE"; private static final String CONFIRM_ROUTING_KEY = "CONFIRM.ROUTINGKEY"; //############################################ @ResponseBody @GetMapping("/sendConfirmmsg/{message}") public void sendConfirmMsg(@PathVariable("message") String message){ /** exchange routingKey message CorrelationData correlationData 放入确认回调的信息 **/ String id = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE+"1234",CONFIRM_ROUTING_KEY,message,correlationData); System.out.println("发送了编号为:"+id+"消息"+"内容是:"+message); }

这个?CONFIRM_EXCHANGE+"1234"? 是故意制造错误 用来验证数据发送不到交换机

编写消费者

package org.rmq.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class ConfirmConsumer { private static final String CONFIRM_QUEUE = "CONFIRM.QUEUE"; @RabbitListener(queues = CONFIRM_QUEUE) public void recieveConfirmMsg(Message message){ System.out.println("ConfirmConsumer 接受到消息:"+new String(message.getBody())); } }

测试:故意修改交换机的RoutingKey , 让生产端的消息 不能发送到交换机 ,检查 控制台信息,报错

?如果没有这个?spring.rabbitmq.publisher-confirms=true?配置 会发现我们得 fallbak 函数 不会被调用,发送了编号为:3fec94f3-c023-4653-badd-fbf9505be401消息内容是:www012 2021-11-26 20:00:12.881 ERROR 31628 --- [68.217.128:5672] o.s.a.r.c.CachingConnectionFactory ? ? ? : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CONFIRM.EXCHANGE1234' in vhost '/', class-id=60, method-id=40)

这个错误是因为我们还少了个配置需要配置到 properties 文件

# 应用名 spring.application.name=springboot-rabbitmq # rabbitmq配置信息 # ip spring.rabbitmq.host=192.168.217.128 # 端口 spring.rabbitmq.port=5672 # 用户名 spring.rabbitmq.username=developer # 密码 spring.rabbitmq.password=dev123456 # 配置虚拟机 spring.rabbitmq.virtual-host=/ # 消息开启手动确认 spring.rabbitmq.listener.direct.acknowledge-mode=manual # 开启发布确认回调 spring.rabbitmq.publisher-confirms=true

spring.rabbitmq.publisher-confirms=true? ? 需要将这个 配置 写入配置文件 有的版本是

spring.rabbitmq.publisher-confirm-type= simple 或者 none? 或者 corrected? 请根据自己的 版本来选用

none : 禁用发布确认模式

corrected : 会触发消息发布确认,并且不会关闭信道 channel

simple : 会调用 waitConfirms() 或?waitConfirmsOrDie() 等待broker 返回结果,根据返回结果判断调用下一步逻辑,需要注意的是 调用?waitConfirmsOrDie() 返回false 的时候 ,会关闭channel ,导致 、无法继续发消息到broker?

4.2:消息回退

当交换机可达 但是 交换机需要路由到对应的 队列的时候 无法进行路由,就可以通消息回退的方式来获取到那个消息不能发送成功,获取到这些消息之后,就可以进行重新发送

在配置文件中 添加

# 开启消息回退 spring.rabbitmq.publisher-returns=true

修改4.1 中的callback文件

package org.rmq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Slf4j @Component public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * correlationData 消息确认内容 * ack 是否收到确认 * cause 收到或未收到确认的原因 * **/ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ String id = correlationData.getId(); log.info("确认收到消息,"+"编号:"+id); }else{ String id = correlationData.getId(); log.info("未能确认收到编号为:"+id+"的消息,失败原因是:"+cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息{}无法路由被交换机{}回退,原因是{},路由key:{}", message.getBody(),exchange,replyText,routingKey); } }

修改生产者

@ResponseBody @GetMapping("/sendConfirmmsg/{message}") public void sendConfirmMsg(@PathVariable("message") String message){ /** exchange routingKey message CorrelationData correlationData 放入确认回调的信息 **/ String id = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE,CONFIRM_ROUTING_KEY,message,correlationData); System.out.println("发送了编号为:"+id+"消息"+"内容是:"+message); String id2 = UUID.randomUUID().toString(); CorrelationData correlationData2 = new CorrelationData(id2); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE,CONFIRM_ROUTING_KEY+"1234",message,correlationData2); System.out.println("发送了编号为:"+id2+"消息"+"内容是:"+message+"key是:"+CONFIRM_ROUTING_KEY+"1234"); }

?执行结果:

4.3:备份交换机

我们还可以对4.2中的 架构进行优化,但发送不到 queue 的时候 可以对交换机做备份,当无法路由时不直接 消息回退,而是通过交换机将消息发送给备份交换机

修改 配置类

package org.rmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { private static final String CONFIRM_EXCHANGE = "CONFIRM.EXCHANGE"; private static final String CONFIRM_QUEUE = "CONFIRM.QUEUE"; private static final String CONFIRM_ROUTING_KEY = "CONFIRM.ROUTINGKEY"; // 备份交换机 备份被队列 private static final String BACKUP_EXCHANGE = "BACKUP.EXCHANGE"; private static final String BACKUP_QUEUE = "BACKUP.QUEUE"; private static final String WARNING_QUEUE = "WARNING.QUEUE"; // @Bean("confirmExchange") // public DirectExchange confirmExchange(){ // return new DirectExchange(CONFIRM_EXCHANGE); // } //建立普通交换机在无法消费数据是将消息转发给备份交换机 @Bean("confirmExchange") public DirectExchange confirmExchange(){ return (DirectExchange) ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true) .withArgument("alternate-exchange",BACKUP_EXCHANGE).build(); } @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE).build(); } @Bean public Binding confirmBind(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } //声明备份交换机 备份被队列 @Bean("backupExchange") public FanoutExchange backupExchange(){ return new FanoutExchange(BACKUP_EXCHANGE); } @Bean("backupQueue") public Queue backupQueue(){ return QueueBuilder.durable(BACKUP_QUEUE).build(); } @Bean("warningQueue") public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE).build(); } //绑定 备份交换机和备份队列 @Bean public Binding bcackupQueueBindToBackupExchange(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange){ return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding warningQueueBindToBackupExchange(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange backupExchange){ return BindingBuilder.bind(warningQueue).to(backupExchange); } }

添加一个备份的消费者

package org.rmq.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class WarningConsumer { private static final String WARNING_QUEUE = "WARNING.QUEUE"; @RabbitListener (queues = WARNING_QUEUE) public void recieveWarningMsg(Message message){ log.info("warning get message"+new String(message.getBody())); } }

生产者 还是用 4.2中的代码逻辑

@ResponseBody @GetMapping("/sendConfirmmsg/{message}") public void sendConfirmMsg(@PathVariable("message") String message){ /** exchange routingKey message CorrelationData correlationData 放入确认回调的信息 **/ String id = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE,CONFIRM_ROUTING_KEY,message,correlationData); System.out.println("发送了编号为:"+id+"消息"+"内容是:"+message); String id2 = UUID.randomUUID().toString(); CorrelationData correlationData2 = new CorrelationData(id2); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE,CONFIRM_ROUTING_KEY+"1234",message,correlationData2); System.out.println("发送了编号为:"+id2+"消息"+"内容是:"+message+"key是:"+CONFIRM_ROUTING_KEY+"1234"); }

执行代码 , 报错,需要在管理控制台删除之前在4.2 中创建的队列和交换机,并重新启动程序,因为之前在普通交换机上并没有设置出现不能转发时的备份交换机

2021-11-27 10:14:48.997 ERROR 17092 --- [68.217.128:5672] o.s.a.r.c.CachingConnectionFactory ? ? ? : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'alternate-exchange' for exchange 'CONFIRM.EXCHANGE' in vhost '/': received the value 'BACKUP.EXCHANGE' of type 'longstr' but current is none, class-id=40, method-id=10) 2021-11-27 10:14:50.009 ERROR 17092 --- [68.217.128:5672] o.s.a.r.c.CachingConnectionFactory ? ? ? : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'alternate-exchange' for exchange 'CONFIRM.EXCHANGE' in vhost '/': received the value 'BACKUP.EXCHANGE' of type 'longstr' but current is none, class-id=40, method-id=10) 2021-11-27 10:14:52.016 ERROR 17092 --- [68.217.128:5672] o.s.a.r.c.CachingConnectionFactory ? ? ? : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'alternate-exchange' for exchange 'CONFIRM.EXCHANGE' in vhost '/': received the value 'BACKUP.EXCHANGE' of type 'longstr' but current is none, class-id=40, method-id=10) ?

重新 执行发送信息可以看到 不能发送到普通交换机的消息 已经被备份交换机获取到了

?4.4: 优先级队列

创建优先级消息的生产者

@ResponseBody @GetMapping("/sendprimsg/{message}") public void sendPriMsg(@PathVariable("message") String message){ for(int i=10;i>1;i--){ int finalI = i; rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,"queue:"+i, msg -> { msg.getMessageProperties().setPriority(finalI); //设置优先级 return msg; }); } }

设置配置类

package org.rmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; //优先级队列配置类 @Configuration public class priQueueConfig { private static final String EXCHANGE = "priority-exchange"; public static final String QUEUE = "priority-queue"; private static final String ROUTING_KEY = "priority.queue"; @Bean DirectExchange priExchange(){ return new DirectExchange(EXCHANGE); } @Bean Queue priQueue(){ Map<String,Object> map = new HashMap<>(); map.put("x-max-priority",10);//设置最大的优先级数量 return new Queue(QUEUE,true,false,false,map); } @Bean public Binding priQueueBindpriExchange( @Qualifier("priQueue") Queue priQueue, @Qualifier("priExchange") DirectExchange priExchange ){ return BindingBuilder.bind(priQueue).to(priExchange).with(ROUTING_KEY); } }

设置消费者

package org.rmq.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class PriConsumer { @RabbitListener(queues ="priority-queue") public void hand(String msg){ log.info("接受到了一个消息:"+msg); } }

以上几个章节代码完整路径

GitHub - wanglei111000/RabbitMqDemo

GitHub - wanglei111000/Springboot-rabbitmq

5:RabbitMQ 集群

请参考我的另一个博客 :?RabbitMQ HAProxy +Keepalived 高可用集群_Java 码农的博客-CSDN博客


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #rabbitmq #实战 #1安装 #RabbitMQ这里 #y #install