irpas技术客

Day8-SpringCloud消息驱动Stream与链路追踪Sleuth_欢迎来到风离的个人空间

irpas 7963

SpringCloud Stream 消息驱动

屏蔽底层消息中间件的差异 统一消息的编程模型(没有什么是套一层接口解决不了的~)

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,

像RabbitMQ有exchange,kafka有Topic和Partitions分区,

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

Stream总体架构图

发送和接收消息流程 生产者

pom.xml 引入依赖

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>

yaml配置

server: port: 8801 spring: application: name: cloud-stream-provider rabbitmq: host: 106.14.154.114 port: 5672 username: admin password: 123456 cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) # instance-id: send-8801.com # 在信息列表时显示主机名称 # prefer-ip-address: true # 访问的路径变为IP地址

发送消息

Controller

@GetMapping(value = "/sendMessage") public String sendMessage() { return messageProvider.send(); }

messageProvider.send()

package com.atguigu.springcloud.service.impl; import com.atguigu.springcloud.service.IMessageProvider; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.integration.support.MessageBuilderFactory; import org.springframework.messaging.MessageChannel; import org.springframework.integration.support.MessageBuilder; import javax.annotation.Resource; import org.springframework.cloud.stream.messaging.Source; import javax.annotation.Resource; import java.util.UUID; @EnableBinding(Source.class) //定义消息的推送管道 Source 是推送方 Sink 是接收方 public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; // 消息发送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("*****serial: "+serial); return null; } }

调用接口 向RabbitMQ中发送消息

http://localhost:8801/sendMessage

消费者

pom.xml

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>

yaml

server: port: 8802 spring: application: name: cloud-stream-consumer rabbitmq: host: 106.14.154.114 port: 5672 username: admin password: 123456 cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 # environment: # 设置rabbitmq的相关的环境配置 # spring: # rabbitmq: # host: localhost # port: 5672 # username: guest # password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置

Controller

package com.atguigu.springcloud.controller; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component @EnableBinding(Sink.class) //表明绑定接收方 public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"\t port: "+serverPort); } } 分组消费与持久化

先给出结论 不同组是订阅模式 会重复消费 同一组中所有单位只有一个能消费

设置分组

bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: atguiguA # 设置消费者的分组

验证: 设置统一分组 每个消费者消费不同的消息

设置不同分组 每个消费者消费相同消息:

持久化

给出结论: 当消费者宕机之后 设置分组的消费者可消费 还在交换机中的消息

验证

都设置不同分组

每个消费者消费同一消息

设置相同分组:

后启动的没抢到消息~~


SpringCloud Sleuth 链路追踪

引出: 在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的的服务节点调用来协同产生最后的请求结果,每一个前段请求都会形成一条复杂的分布式服务调用链路,链路中的任何一环出现高延时或错误都会引起整个请求最后的失败。

挑明说吧: 就是查看每条路线和在该路线下所花的时间

使用步骤

【zipkin】下载安装启动 端口: http://localhost:9411

客户端服务端分别导入依赖

<!--包含了sleuth+zipkin--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zipkin</artifactId> </dependency>

Yaml配置

server: port: 8001 spring: application: name: cloud-payment-service zipkin: base-url: http://localhost:9411 sleuth: sampler: #采样率值介于 0 到 1 之间,1 则表示全部采集 probability: 1 测试即可


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #springcloud #stream #与Sleuth