irpas技术客

Spring Cloud Stream的配置及使用——以RabbitMQ为例_CaptHua_springcloud stream配置

未知 2072

1. 简介

https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html 英语好的可以直接看官方文档,文档里讲的更全面 By default, the RabbitMQ Binder implementation maps each destination to a TopicExchange. For each consumer group, a Queue is bound to that TopicExchange.

上图是RabbitMQ Binder(绑定器)。默认情况下,绑定器实现将每一个destination映射到一个TopicExchange。对于每一个消费者组,都有一个队列绑定到那个TopicExchange。

2. 依赖配置 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> 3. 生产者配置及消息发送 3.1 yaml配置 spring: cloud: stream: # 如果有一个binder的话,就不需要设置 default-binder: rabbit binders: rabbit1: type: rabbit environment: spring: rabbitmq: host: 192.168.70.224 port: 5672 username: admin password: 444944 virtual-host: GHost rabbit: type: rabbit defaultCandidate: false environment: spring: rabbitmq: host: 192.168.70.167 port: 5672 username: admin password: public virtual-host: / bindings: order: binder: rabbit destination: order producer: # 默认是true autoStartup: true cart: binder: rabbit destination: cart routingKeyExpression: han producer: # 默认是true autoStartup: true # rabbitmq发送的消息默认routingKey为destination, 如果是分区的destination, 默认值为destination-<partition>

配置说明:

在Spring Cloud Stream中可以配置多个binder,也就是可以配置连接多个MQ服务器在RabbitMQ中,binding的名称对应的是output和input的名称,destination对应mq中的exchange名称。上述配置会生成order,cart两个exchangerabbitmq发送的消息默认routingKey为destination, 如果是分区的destination, 默认值为destination-,分区内容这里不涉及,具体内容看官方文档 3.2 生产者声明 public interface CartSource { /** * Name of the output channel. * cart对应的是binding, * destination对应的是rabbitmq里的exchange,kafka中的topic. * 如果没有设置destination, rabbitmq会自动创建一个和binding同名的exchange */ String OUTPUT = "cart"; /** * @return output channel */ @Output(CartSource.OUTPUT) MessageChannel output(); }

cloud stream中output表示发送,这里的cart与配置中的binding名称是对应的

//必须添加,要不然无法注入。也可以加在启动类上,可以重复添加 @EnableBinding(CartSource.class) @Component public class CartSender { @Autowired private CartSource orderSource; private static final Logger logger= LoggerFactory.getLogger(CartSender.class); public void pushMsg(Order order){ logger.info("sending rabbitmq message:{}",order.toString()); orderSource.output().send(MessageBuilder.withPayload(order).build()); } 3.3 消息发送 @RestController @RequestMapping("mqTest1") public class MqTest1Controller { @Autowired CartSender cartSender; @GetMapping("streamPush") public String streamPush(){ cartSender.pushMsg(new Order()); return "hehe"; } } 4. 消费者配置 4.1 yaml配置 spring: cloud: stream: # 如果有一个binder的话,就不需要设置 default-binder: rabbit binders: rabbit1: type: rabbit environment: spring: rabbitmq: host: 192.168.70.224 port: 5672 username: admin password: 444944 virtual-host: / rabbit: type: rabbit defaultCandidate: false environment: spring: rabbitmq: host: 192.168.70.167 port: 5672 username: admin password: public virtual-host: / bindings: order: binder: rabbit destination: order group: myOrderQueue conumer: concurrency: 3 cart: binder: rabbit destination: cart group: myCartQueue conumer: concurrency: 3 # rabbit的扩展配置 RabbitExtendedBindingProperties rabbit: bindings: # order: # consumer: # bindingRoutingKey: order-key cart: consumer: # 如果没有指定routing key,会使用默认的 # bindingRoutingKey: cart-key

配置说明:

只有消费者才会创建队列,queue名称为:<bindingName><destination>.<group>routingKey设置 spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey 如果没有设置的话,默认为#

4.2 消费者声明与消息接收 public interface CartSink { String INPUT = "cart"; /** * @return input channel. */ @Input(CartSink.INPUT) SubscribableChannel input(); }

cloud stream中input表示接收,这里的cart与配置中的binding名称是对应的

@EnableBinding(CartSink.class) public class CartHandler { private static final Logger logger = LoggerFactory.getLogger(CartHandler.class); /** * 参数也可以是对象,会自动将消息转换为对象 * @param headers * @param payload */ @StreamListener(CartSink.INPUT) public void loggerSink(@Headers MessageHeaders headers, byte[] payload){ String cartChange=new String(payload); logger.info("cart change:{}",cartChange); } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #springcloud #stream配置 #Spring #Cloud