irpas技术客

SpringBoot集成Kafka_拥有1024的蜡笔小新_springboot集成kafka

大大的周 6462

SpringBoot集成Kafka

知识索引

SpringBoot集成Kafka工程搭建SpringBoot集成Kafka配置SpringBoot集成Kafka生产消息SpringBoot集成Kafka消费消息

?

?

1 生产者

SpringBoot集成Kafka,无非就是生产者和消费者,但首先得实现SpringBoot集成,流程如下:

1:pom.xml引入依赖包 2:创建启动类 3:配置核心配置文件application.yml

?

1.1 pom.xml

在pom.xml中引入spring-kafka,代码如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://·mon.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer

参数说明:

bootstrap-servers:Kafka服务地址 retries:发送失败后,重试次数,0表示不重试 key-serializer:向Kafka发送数据,key采用的序列化方式 value-serializer:向Kafka发送数据,数据采用的序列化方式

?

1.4 生产者消息发送

消息发送使用KafkaTemplate实现消息发送,创建com.itmentu.controller.SendController,实现消息发送,代码如下:

@RestController @RequestMapping(value = "/producer") public class SendController { @Autowired private KafkaTemplate kafkaTemplate; /*** * 发送消息 * topic:要发送的队列 * msg:发送的消息 */ @GetMapping(value = "/send/{topic}/{msg}") public String send(@PathVariable(value = "topic")String topic,@PathVariable(value = "msg")String msg){ //消息发送 kafkaTemplate.send(topic,msg); return "SUCCESS"; } }

发送消息的时候,使用的是kafkaTemplate.send(topic,msg),第1个参数是队列名字或者叫主题名字,第2个参数是发送的消息。

?

我们可以请求http://localhost:18081/producer/send/itmentu/hello测试,eagle打开,可以看到效果如下:

?

?

2 消费者

SpringBoot集成Kafka实现消费者消费消息,流程如下:

1:pom.xml引入依赖包 2:创建启动类 3:配置核心配置文件application.yml

?

2.1 pom.xml

和生产者一样,都需要引入spring-kafka依赖,配置如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://·mon.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

参数说明:

bootstrap-servers:Kafka服务地址 group-id:消费者组默认名字 enable-auto-commit:每次消费数据后,需要提交数据的偏移量,这里true表示自动提交,false表示手动提交 key-deserializer:读Kafka数据,key采用的反序列化方式 value-deserializer:读Kafka数据,数据采用的反序列化方式

?

2.4 消费消息

Kafka消费消息采用@KafkaListener(topics="{}",groupId="")方式消费数据,我们可以创建一个消息监听类实现消息消费,创建com.itmentu.listener.MessageListener,代码如下:

@Component public class MessageListener { @KafkaListener(topics = {"itmentu"},groupId = "itmentuGroup") public void listener(ConsumerRecord<String,String> record){ //获取消息 String message = record.value(); //消息偏移量 long offset = record.offset(); System.out.println("读取的消息:"+message+"\n当前偏移量:"+offset); } }

?

测试效果如下:

?

?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。