irpas技术客

SpringBoot+RabbitMQ实现发布订阅模式消息队列_什么都干的派森_mq springboot 订阅

网络 7863

文章目录 一、适用场景二、前提条件三、执行流程四、实现方法1.依赖包2.application配置文件3.创建一个接收消息的对象4.创建三个消息队列绑定到一个发布订阅交换机上5.创建三个队列的消费者6.controller中创建一个生产者 五、测试结果


一、适用场景

高并发场景下,多个耗时业务顺序执行会浪费大量时间,多线程又会导致过高的cpu占用,较好的方式是采用消息队列。


二、前提条件

需要先部署一个RabbitMQ,可以参考这个教程 https://blog.csdn.net/weixin_43721000/article/details/124587795


三、执行流程 #mermaid-svg-m394hdaTvt1HFKVv {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-m394hdaTvt1HFKVv .error-icon{fill:#552222;}#mermaid-svg-m394hdaTvt1HFKVv .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-m394hdaTvt1HFKVv .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-m394hdaTvt1HFKVv .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-m394hdaTvt1HFKVv .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-m394hdaTvt1HFKVv .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-m394hdaTvt1HFKVv .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-m394hdaTvt1HFKVv .marker{fill:#333333;stroke:#333333;}#mermaid-svg-m394hdaTvt1HFKVv .marker.cross{stroke:#333333;}#mermaid-svg-m394hdaTvt1HFKVv svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-m394hdaTvt1HFKVv .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-m394hdaTvt1HFKVv .cluster-label text{fill:#333;}#mermaid-svg-m394hdaTvt1HFKVv .cluster-label span{color:#333;}#mermaid-svg-m394hdaTvt1HFKVv .label text,#mermaid-svg-m394hdaTvt1HFKVv span{fill:#333;color:#333;}#mermaid-svg-m394hdaTvt1HFKVv .node rect,#mermaid-svg-m394hdaTvt1HFKVv .node circle,#mermaid-svg-m394hdaTvt1HFKVv .node ellipse,#mermaid-svg-m394hdaTvt1HFKVv .node polygon,#mermaid-svg-m394hdaTvt1HFKVv .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-m394hdaTvt1HFKVv .node .label{text-align:center;}#mermaid-svg-m394hdaTvt1HFKVv .node.clickable{cursor:pointer;}#mermaid-svg-m394hdaTvt1HFKVv .arrowheadPath{fill:#333333;}#mermaid-svg-m394hdaTvt1HFKVv .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-m394hdaTvt1HFKVv .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-m394hdaTvt1HFKVv .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-m394hdaTvt1HFKVv .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-m394hdaTvt1HFKVv .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-m394hdaTvt1HFKVv .cluster text{fill:#333;}#mermaid-svg-m394hdaTvt1HFKVv .cluster span{color:#333;}#mermaid-svg-m394hdaTvt1HFKVv div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-m394hdaTvt1HFKVv :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 发布 订阅 订阅 订阅 生产者 交换机 消息队列A 消息队列B 消息队列C 消费者A 消费者B 消费者C
四、实现方法 1.依赖包 <!-- RabbitMQ 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- 其他依赖【非必要依赖,用于测试结果】 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.9</version> </dependency> <!-- 其他依赖【非必要依赖,用于测试结果】 --> 2.application配置文件 server: port: 8080 spring: rabbitmq: host: 192.168.0.231 # RabbitMQ主机ip port: 5672 # RabbitMQ端口 username: admin # 用户名 password: 123456 # 密码 virtual-host: my_vhost # 虚拟主机名【用于隔离不同用户创建的消息队列】 3.创建一个接收消息的对象 package com.cxstar.bean; import lombok.Data; @Data public class MsgBean { private String searchKey; private Integer curPage; // fastjson的json转对象的方法需要一个无参构造 public MsgBean() {} public MsgBean(String searchKey, Integer curPage) { this.searchKey = searchKey; this.curPage = curPage; } } 4.创建三个消息队列绑定到一个发布订阅交换机上 package com.cxstar.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutExchangeConfig { // 新建发布订阅交换机,并将队列A B C绑定到该交换机 // 命名 ------------------------------------------------------------------------ // 交换机名 public static final String FANOUT_EXCHANGE = "fanout_exchange"; // 队列名 public static final String FANOUT_EXCHANGE_QUEUE_A = "fanout_exchange_queue_a"; public static final String FANOUT_EXCHANGE_QUEUE_B = "fanout_exchange_queue_b"; public static final String FANOUT_EXCHANGE_QUEUE_C = "fanout_exchange_queue_c"; // ----------------------------------------------------------------------------- // 创建 ------------------------------------------------------------------------ // 创建交换机 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } // 创建队列 @Bean public Queue queueA(){ return new Queue(FANOUT_EXCHANGE_QUEUE_A); } @Bean public Queue queueB(){ return new Queue(FANOUT_EXCHANGE_QUEUE_B); } @Bean public Queue queueC(){ return new Queue(FANOUT_EXCHANGE_QUEUE_C); } // ----------------------------------------------------------------------------- // 将三个队列绑定到交换机上【队列订阅交换机】 ------------------------------------------------------------------ @Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } @Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } // --------------------------------------------------------------------------------------- } 5.创建三个队列的消费者 package com.cxstar.consumer; import com.alibaba.fastjson.JSONObject; import com.cxstar.bean.MsgBean; import com.cxstar.config.FanoutExchangeConfig; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Slf4j @Component public class Consumer { // 消费者A @RabbitListener(queues = FanoutExchangeConfig.FANOUT_EXCHANGE_QUEUE_A) public void receiverQueueA(String msg, Channel channel, Message message) throws IOException { // 消息转对象 MsgBean msgBean = JSONObject.toJavaObject(JSONObject.parseObject(msg), MsgBean.class); // 消费 log.info("receiverQueueA 消费对象:"+msgBean.toString()); } // 消费者B @RabbitListener(queues = FanoutExchangeConfig.FANOUT_EXCHANGE_QUEUE_B) public void receiverQueueB(String msg, Channel channel, Message message) throws IOException { // 消息转对象 MsgBean msgBean = JSONObject.toJavaObject(JSONObject.parseObject(msg), MsgBean.class); // 消费 log.info("receiverQueueB 消费对象:"+msgBean.toString()); } // 消费者C @RabbitListener(queues = FanoutExchangeConfig.FANOUT_EXCHANGE_QUEUE_C) public void receiverQueueC(String msg, Channel channel, Message message) throws IOException { // 消息转对象 MsgBean msgBean = JSONObject.toJavaObject(JSONObject.parseObject(msg), MsgBean.class); // 消费 log.info("receiverQueueC 消费对象:"+msgBean.toString()); } } 6.controller中创建一个生产者 package com.cxstar.controller; import com.alibaba.fastjson.JSONObject; import com.cxstar.bean.MsgBean; import com.cxstar.config.FanoutExchangeConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class Controller { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/send_msg") public void sendExchange(){ // 创建消息 MsgBean msgBean = new MsgBean(); msgBean.setSearchKey("python"); msgBean.setCurPage(1); String msgBeanJsonString = JSONObject.toJSONString(msgBean); // 生产者发布消息到交换机 rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, null, msgBeanJsonString); } }
五、测试结果

启动 SpringBoot 访问 127.0.0.1:8080/send_msg


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #MQ #springboot #订阅