irpas技术客

如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息_森林迷了~鹿_rabbitmq多个消费者同时接收

irpas 5198

如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息

场景:用户登录,邀请其它用户进行视频会议,收到邀请的用户进入会议

rabbitmq实现思路:

选型:发布订阅模式(Publish/Subscribe)

一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。

这种情况下,我们有四种交换机可供选择,分别是:

DirectFanoutTopicHeader

由于消费者的数量不固定,所以要动态生成临时队列,无法指定routingkey因此选fanout模式

FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用

代码实现: 1.pom文件引入rabbitmq依赖

<!-- rabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2.配置文件

server: port: 9091 spring: application: name: rabbitmq # rabbitmq配置 rabbitmq: host: 192.168.8.142 port: 5672 username: admin password: admin virtual-host: my_vhost

3.constant类

package com.anychat.rabbitmqtest.constant; /** * @author Liby * @date 2022-05-05 10:02 * @description: * @version: */ public class RabbitmqConstant { public static final String MEETING_FANOUT_EXCHANGE = "meeting_exchange"; }

4.用户实体类

package com.anychat.rabbitmqtest.entity; /** * @author Liby * @date 2022-05-06 09:39 * @description: * @version: */ public class User { private Integer userId; private String username; public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public User(Integer userId, String username) { this.userId = userId; this.username = username; } }

5.工具类

package com.anychat.rabbitmqtest.util; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; /** * @author Liby * @date 2022-04-28 10:27 * @description: * @version: */ public class RabbitmqUtil { @Autowired private static RabbitTemplate rabbitTemplate; public static Channel getChannel() { Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true); return channel; } }

6.消费者类

package com.anychat.rabbitmqtest.consumer; import cn.hutool.core.util.StrUtil; import com.anychat.rabbitmqtest.constant.RabbitmqConstant; import com.anychat.rabbitmqtest.entity.User; import com.rabbitmq.client.*; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; /** * @author Liby * @date 2022-04-25 11:18 * @description:消费者,动态创建临时队列 * @version: */ @Slf4j @Component public class FanoutConsumer { @Autowired private RabbitTemplate rabbitTemplate; public void createQueue(User user) { //创建信道 Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true); try { //声明一个交换机与生产者相同 channel.exchangeDeclare(RabbitmqConstant.MEETING_FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true); //获取一个随机的队列名称,使用默认方式,产生的队列为临时队列,在没有消费者时将会自动删除 String queueName = channel.queueDeclare().getQueue(); //用户Id与队列名绑定 ConcurrentHashMap<String, Integer> userQueueMap = new ConcurrentHashMap<>(); userQueueMap.putIfAbsent(queueName, user.getUserId()); //关联 exchange 和 queue ,因为是广播无需指定routekey,routingKey设置为空字符串 // channel.queueBind(queue, exchange, routingKey) channel.queueBind(queueName, RabbitmqConstant.MEETING_FANOUT_EXCHANGE, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); //对信息进行操作 String message = new String(body, "UTF-8"); if (StrUtil.isNotBlank(message)) { String[] receiveIds = message.split(","); Integer userId = userQueueMap.get(queueName); for (String id : receiveIds) { if (userId.equals(Integer.valueOf(id))) { log.info("用户{}收到入会邀请", id); } } } } }; //true 自动回复ack channel.basicConsume(queueName, true, consumer); } catch (Exception ex) { } } }

7.controller类

package com.anychat.rabbitmqtest.controller; import com.anychat.rabbitmqtest.constant.RabbitmqConstant; import com.anychat.rabbitmqtest.consumer.FanoutConsumer; import com.anychat.rabbitmqtest.entity.User; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author Liby * @date 2022-04-24 16:34 * @description:生产者 * @version: */ @RestController @Slf4j @RequestMapping("/producer") public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private FanoutConsumer fanoutConsumer; /** * 模拟用户登录后,创建一个临时队列,与该用户绑定 */ @PostMapping("/login") public String login(){ //模拟三个用户登录 int userNum=3; for (int i = 0; i < userNum; i++) { //用户绑定临时队列,并监听队列 fanoutConsumer.createQueue(new User(i, "用户" + i)); log.info("用户{}登录成功",i); } return "用户登录成功"; } @PostMapping("/meeting") public String meeting(){ String message="1,2"; log.info("邀请用户{}进入会议",message); //发送消息,要求userId为2和3的用户进入会议 rabbitTemplate.convertAndSend(RabbitmqConstant.MEETING_FANOUT_EXCHANGE,"",message); return "发送成功"; } }

postman分别调用login和meeting两个接口 可以看到日志打印


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。