irpas技术客

第 6 篇 : SpringBoot整合Kafka_哼唧兽0921_kafka springboot整合

网络投稿 5044

1. 在redis项目中,增加kafka依赖,刷新maven <!-- Kafka For Spring --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.1</version> </dependency> 2. 在application.yml文件中添加kafka配置

放在spring下

kafka: bootstrap-servers: 192.168.109.160:9092,192.168.109.161:9092,192.168.109.162:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 3. 创建两个主题,两组消费者(组1有2个成员) package com.hahashou.test.redis.service; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @description: kafka消费者 * @author: 哼唧兽 * @date: 9999/9/21 **/ @Component @Slf4j public class KafkaConsumer { public static final String FIRST_TOPIC = "first-topic"; public static final String SECOND_TOPIC = "second-topic"; @KafkaListener(topics = FIRST_TOPIC, groupId = "first") public void firstTopicFirst1 (ConsumerRecord<?, ?> record) { log.info("First主题第1个消费者 : "+record.topic()+" : "+record.partition()+" : "+record.value()); } @KafkaListener(topics = FIRST_TOPIC, groupId = "first") public void firstTopicFirst2 (ConsumerRecord<?, ?> record) { log.info("First主题第2个消费者 : "+record.topic()+" : "+record.partition()+" : "+record.value()); } @KafkaListener(topics = SECOND_TOPIC, groupId = "first") public void secondTopicFirst1 (ConsumerRecord<?, ?> record) { log.info("Second主题第1个消费者 : "+record.topic()+" : "+record.partition()+" : "+record.value()); } } 4. TestKafkaController,并启动项目 package com.hahashou.test.redis.controller; import com.hahashou.test.redis.service.KafkaConsumer; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; /** * @description: 测试Kafka * @author: 哼唧兽 * @date: 9999/9/21 **/ @RestController @RequestMapping("/kafka") @Api(tags = "测试Kafka") @Slf4j public class TestKafkaController { @Resource private KafkaTemplate<String, String> kafkaTemplate; @GetMapping("/send") @ApiOperation(value = "发送消息") public String send(@RequestParam Integer count) throws InterruptedException { for (int i = 0; i < count; i++) { kafkaTemplate.send(KafkaConsumer.FIRST_TOPIC, i+" for first-topic"); Thread.sleep(500); kafkaTemplate.send(KafkaConsumer.SECOND_TOPIC, i+"for second-topic"); } return "成功"; } } 5. 测试 5.1 先执行5次

消费日志 根据我看到的文档,以及对kafka的了解,推断一下 :

消费记录按时间有序first-topic的第一个消费者监听的是分区 0 和 1, 第二个消费者监听的分区是 2 和 3多个分区数据由一组消费者消费,只有一个成员肯定自己全消费咯确实是按照配置文件创建了 4 个分区,官方示例是8官方给出的生产配置示例,不建议自动创建topic,是因为分区数量和消费者数量最好是比例差不多,对等应该是比较好的。手动创建topic,至少有那么一瞬间是知道自己需要创建topic,以及topic的分区数量、备份数量,以便后续开发人员创建合适数量的消费者。消费者数量肯定是不能多余分区数量的(官方文档应该有说吗,我没看到),下面增加3个消费者,测试一下 5.2 删除掉second-topic,对first-topic再增加3个消费者,重启项目

消费日志 第3个消费者没有抢到任何分区,就是个摆设,项目重启,消费者对应分区会重新分配


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Kafka #springboot整合 #1 #在redis项目中 #增加kafka依赖 #刷新mavenamplt #for