irpas技术客

Spring Cloud Stream 消息驱动快速入门_学然后知不足!

未知 3735

简单介绍

Spring Cloud Stream是一个用于构建与共享消息系统连接的高度可扩展的事件驱动微服务的框架。该框架提供了一个灵活的编程模型,模型建立在已经建立和熟悉的Spring习惯用法和最佳实践之上,包括对持久发布/订阅语义、消费者组和有状态分区的支持

Spring Cloud Stream为一些提供商的消息中间件提供了个性化的自动化配置,本质上整合了Spring Boot和 Sping Integration实现了一套轻量级的小东西驱动和微服务框架,有效的简化了开发人员对消息中间件的使用复杂度。

从上图可以看到Spirng Cloud Stream构建的应用程序与消息中间件之间是通过绑定器Binder实现的,绑定器作为中间层实现了应用程序与消息中间件细节之间的隔离。当需要升级消息中间件或者更换其他消息中间件的时候,我们只需要更换它对应的绑定器Binder 即可,不需要修改Spring Boot 中的应用逻辑。

RabbitMQ 绑定器

Spring Cloud Stream支持多种绑定器实现包括 RabbitMQ, Apache Kafka, Amazon Kinesis, AWS SQS ,Apache RocketMQ等等

当使用RabbitMQ的时候, 可以向Spring Cloud Stream应用中添加下面的的Maven依赖

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>

官网同时说明也可以使用Spring Cloud Stream RabbitMQ starter. 例如

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>

下图显示了RabbitMQ Binder的工作原理图

默认情况下,消息将会发送给RabbitMQ中的主题交换器(TopicExchange),主题交换器通过路由键将消息发送给消费者, RabbitMQ主题交换器基本概念可以查看下面这篇文章,这里不再多说

链接:RabbitMQ教程主题交换器Topics

快速入门 1. 设置POM.XML

创建一个SpringBoot应用,pom.xml文件中增加 spring-cloud-starter-stream-rabbit 引用,参考如下信息、

这里需要注意 spring-cloud-starter-stream-rabbit 默认引用了org.springframework.amqp:spring-rabbit ,如果工程中有其他版本的同名包,可能会引起依赖问题

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.2.RELEASE</version> <relativePath/> </parent> <groupId>com.stpringcloud.test</groupId> <artifactId>stream-rabbitmq</artifactId> <version>1.0.0</version> <name>stream-rabbitmq</name> <description>Demo project for Spring Cloud Stream</description> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Dalston.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> 2. 创建消息接收对象

SinkReceiver对象用于接收消息

@EnableBinding(Sink.class) public class SinkReceiver { private static Logger Log = LoggerFactory.getLogger(SinkReceiver.class); /** * @param payload 有效载荷,一般指对于接收者有用的数据 */ @StreamListener(Sink.INPUT) private void receive(Object payload) { Log.info("SinkReceiver:"+ payload); } }

此对象中

@EnableBinding注解用来指定一个或者多个定了了@Input或者@Output注解的接口,一次实现对消息通过Channel的绑定处理?,例如Sink接口中定义信息如下,接口是SpringCloud Stream中默认实现的对输入消息通道绑定的定义,例如下文中也会提到启动的时候会将一个对垒绑定到input上

public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); }

@StreamListener:它定义到方法上后会将被修饰的方法注册到消息中间件上数据流的时间监听器,注解中的属性对应监听的消息通道名, 上面@StreamListener(Sink.INPUT)注解将receive方法注册未input消息通道的监听处理

3. 启动主类

运行刚创建的应用从SpringBoot应用的启动日志中可以发现一下信息

使用guest用户创建了一个指向本机IP:5672的RabbitMQ连接,在RabbitMQ里声明了一个匿名队列anonymous.Qpe1sgP2ReiLQky8DkWZCA,并将其绑定到input 的交换器上 [ ? ? ? ? ? main] c.s.stream.StreamRabbitmqApplication ? ? : Started StreamRabbitmqApplication in 0.113 seconds (JVM running for 4.423) [ ? ? ? ? ? main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.w8Da-I6HQPa_E9VsbNOZ1w, bound to: input [ ? ? ? ? ? main] o.s.a.r.c.CachingConnectionFactory ? ? ? : Created new connection: SimpleConnection@379f9555 [delegate=amqp://guest@192.168.20.134:5672/, localPort= 61735] [ ? ? ? ? ? main] o.s.i.a.i.AmqpInboundChannelAdapter ? ? ?: started inbound.input.anonymous.Qpe1sgP2ReiLQky8DkWZCA [ ? ? ? ? ? main] o.s.i.endpoint.EventDrivenConsumer ? ? ? : Adding {message-handler:inbound.input.default} as a subscriber to the 'bridge.input' channel [ ? ? ? ? ? main] o.s.i.endpoint.EventDrivenConsumer ? ? ? : started inbound.input.default

以上信息都可以从RabbitMQ Server管理页面看到,

?4. 测试验证

上面提到,启动的时候RabbitMQ里声明了一个匿名队列input.anonymous.w8Da-I6HQPa_E9VsbNOZ1w,并将其绑定到input 的交换器上,而注解@StreamListener(Sink.INPUT)注解将receive方法注册未input消息通道的监听处理,因此我们可以通过管理界面发送1条消息,主题交换器创建1条消息,消息将会被发送到队列input.anonymous.w8Da-I6HQPa_E9VsbNOZ1w中,然后被SinkReceiver#receive接收到并打印到控制台上。

下面做这个测试 通过?RabbitMQ管理页面的publish message发送一个段消息,,

???

发送消息后从控制台中的日志可以看到收到了消息,但是只是得到了消息的引用

[Pa_E9VsbNOZ1w-1] c.s.stream.receiver.SinkReceiver : SinkReceiver:[B@570ffa0d

常见问题

1.ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN

Caused by: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile. at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:342) ~[amqp-client-4.0.2.jar:4.0.2] at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:909) ~[amqp-client-4.0.2.jar:4.0.2] at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:859) ~[amqp-client-4.0.2.jar:4.0.2] at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:799) ~[amqp-client-4.0.2.jar:4.0.2]

断点调试会发现没有配置RabbitMq信息时候,Spring Cloud Stream会使用guest创建一个指向? 本地IP:5672位置的RabbitMQ链接,由于RabbitMq使用guest登录被拒绝,引起了上述报错,

处理方法: Rabbitmq从3.3.0开始禁止使用guest/guest权限通过除localhost外的访问, guest使用IP访问会被拒绝,因此需要禁用访问限制, 先找到\rabbitmq_server-3.7.14\ebin\rabbit.app 文件, 然后将{loopback_users, [<<"guest">>]},配置修改为{loopback_users, []}, 此问题详细信息可参考链接

链接:RabbitMQ登录时guest用户提示User can only log in via localhost


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Spring #Cloud #stream #消息驱动快速入门