目录 1. 单记录消费listener.type=single1.1 单记录消费 - 自动确认1.2 单记录消费 - 手动确认 2. 批量消费listener.type=batch2.1 批量消费 - 自动确认2.2 批量消费 - 手动确认 3. 手动模式下的acknowledge和nack方法
Spring Kafka消费消息的模式分为2种模式(对应spring.kafka.listener.type配置):
single - 每次消费单条记录batch - 批量消费消息列表且每种模式都分为2种提交已消费消息offset的ack模式:
自动确认手动确认接下来依次讲解这两种消费模式及其对应的ack模式的示例配置及代码。
1. 单记录消费listener.type=single本章节先来讲讲record模式 - 单记录消费,且分为自动确认和手动确认2种方式来提交已消费消息offset。
1.1 单记录消费 - 自动确认即由Spring Kafak框架按照配置规则自动提交已消费消息offset,无需程序手动编码控制。
需注意如下对应配置:
# ============ 方式1:定时自动提交[不推荐] ===================== # 开启自动提交(按周期)已消费offset spring.kafka.consumer.enable-auto-commit: true # 自动提交已消费offset时间价格(配置enable-auto-commit=true时使用) spring.kafka.consumer.auto-commit-interval: 1s # ========= 方式2:通过ack-mode设置自动提交[推荐] ============= # 禁用自动提交(按周期)已消费offset spring.kafka.consumer.enable-auto-commit: false # listener类型为单条记录single类型(默认为single单条消费模式) spring.kafka.listener.type: single # offset提交模式为record spring.kafka.listener.ack-mode: record
注: 关于消费者提交已消费消息offset的相关配置说明:
spring.kafka.consumer.enbable-auto-commit true 自动提交已消费消息offset auto-commit-interval 设置自动提交间隔 fasle 由程序控制已消费消息offset提交 spring.kafka.listener.ack-mode 已消费offset提交模式spring.kafka.listener.ack-mode列表(详细说明参见:SpringKafka - Committing Offsets)
消费端配置示例1 - 定时自动提交[不推荐]
spring: kafka: # 逗号分隔的集群broker列表 bootstrap-servers: localhost:9092 # ==================================================== # ================== 消费者配置 ======================== # ==================================================== consumer: # 自动提交(按周期)已消费offset enable-auto-commit: true # 自动提交已消费offset时间价格(配置enable-auto-commit=true时使用) auto-commit-interval: 1s ... # ==================================================== # ============= 消费者监听器(及线程池)配置 ============== # ==================================================== listener: # listener类型 # single | batch type: single # ==================================================== # ============= 具体业务Kafka定义======================= # ==================================================== biz1: topic: topic1 consumer: group: group1消费端配置示例2 - 通过ack-mode设置自动提交[推荐] listener自动提交offsetd的ack-mode模式包括:RECORD | BATCH | TIME | COUNT | COUNT_TIME , 且使用相关自动模式不可在@KafkaListener标注方法中使用Acknowledgment参数。
spring: kafka: # 逗号分隔的集群broker列表 bootstrap-servers: localhost:9092 # ==================================================== # ================== 消费者配置 ======================== # ==================================================== consumer: # 禁用自动提交(按周期)已消费offset enable-auto-commit: false ... # ==================================================== # ============= 消费者监听器(及线程池)配置 ============== # ==================================================== listener: # listener类型 # single | batch type: single # 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定) # 单记录 | 批量 | 超时 | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交 # RECORD | BATCH(默认) | TIME | COUNT | COUNT_TIME | MANUAL | MANUAL_IMMEDIATE # https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets # 注:listener自动提交offset模式包括:RECORD | BATCH | TIME | COUNT | COUNT_TIME , # 且使用相关自动模式不可在@KafkaListener标注方法中使用Acknowledgment参数 ack-mode: record # ==================================================== # ============= 具体业务Kafka定义======================= # ==================================================== biz1: topic: topic1 consumer: group: group1消费端代码示例
/** * 定义biz1消息接收者 * * @param message * @rabbit.exhange exchange1 * @rabbit.aueue queue1 * @rabbit.bindingKey # */ @KafkaListener( id = "biz1-${spring.kafka.biz1.consumer.group}", groupId = "${spring.kafka.biz1.consumer.group}", topics = "${spring.kafka.biz1.topic}") public void biz1Consumer(String message) { log.info("[biz1Consumer] RECV MSG: {}", message); } 1.2 单记录消费 - 手动确认需注意如下对应配置:
# 禁用自动提交(按周期)已消费offset spring.kafka.consumer.enable-auto-commit: false # listener类型为单条记录single类型(默认为single单条消费模式) spring.kafka.listener.type: single # offset提交模式为manual_immediate spring.kafka.listener.ack-mode: manual_immediate
消费端配置示例 手动提交offset的ack-mode模式包括:MANUAL | MANUAL_IMMEDIATE, 且使用相关手动模式需在@KafkaListener标注方法中使用Acknowledgment参数。
spring: # profiles: # active: kafka-origin kafka: # 逗号分隔的集群broker列表 bootstrap-servers: localhost:9092 # ==================================================== # ================== 消费者配置 ======================== # ==================================================== consumer: # 禁用自动提交(按周期)已消费offset enable-auto-commit: false ... # ==================================================== # ============= 消费者监听器(及线程池)配置 ============== # ==================================================== listener: # listener类型 # single | batch type: single # 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定) # 单记录 | 批量 | 超时 | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交 # RECORD | BATCH | TIME | COUNT | COUNT_TIME | MANUAL | MANUAL_IMMEDIATE # https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets # 注:手动提交offset模式包括:MANUAL | MANUAL_IMMEDIATE # 且使用相关手动模式需在@KafkaListener标注方法中使用Acknowledgment参数 ack-mode: manual_immediate消费端代码示例 参考:SpringKafka - Manual Acknowledgment
/** * 定义biz1消息接收者 * * @param message * @kafka.topic topic1 * @kafka.group group1 */ @KafkaListener( id = "biz1-${spring.kafka.biz1.consumer.group}", groupId = "${spring.kafka.biz1.consumer.group}", topics = "${spring.kafka.biz1.topic}") public void biz1Consumer(String message, Acknowledgment ack) { log.info("[biz1Consumer] RECV MSG: {}", message); //确认单当前消息(及之前的消息)offset均已被消费完成 ack.acknowledge(); //拒绝当前消息(此方法仅适用于listener.type=single) //当前poll查询出的剩余消息记录均被抛弃, //且当前消费线程在阻塞指定sleep(如下3000毫秒)后重新调用poll获取待消费消息(包括之前poll被抛弃的消息) //ack.nack(3000) } 2. 批量消费listener.type=batch在一些需要通过批量处理消息的场景中,SpringKafka支持使用Batch Listeners,即批量处理消息列表。 本章节主要讲解batch模式 - 批量消费,且同样分为自动确认和手动确认2种方式来提交已消费消息offset。
2.1 批量消费 - 自动确认需注意如下对应配置:
# 禁用自动提交(按周期)已消费offset spring.kafka.consumer.enable-auto-commit: false # 批量消费的单次最大消费记录数 spring.kafka.consumer.max-poll-reocrds: 50 # listener类型为批量batch类型(默认为single单条消费模式) spring.kafka.listener.type: batch # offset提交模式为batch(不可使用record - 启动报错) spring.kafka.listener.ack-mode: batch
配置示例如下
spring: kafka: # 逗号分隔的集群broker列表 bootstrap-servers: localhost:9092 # ==================================================== # ================== 消费者配置 ======================== # ==================================================== consumer: # 禁用自动提交(按周期)已消费offset enable-auto-commit: false # 单次poll()调用返回的记录数 max-poll-records: 50 # ==================================================== # ============= 消费者监听器(及线程池)配置 ============== # ==================================================== listener: # listener类型 # single | batch type: batch # 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定) # 单记录 | 批量 | 超时 | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交 # RECORD | BATCH | TIME | COUNT | COUNT_TIME | MANUAL | MANUAL_IMMEDIATE # https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets ack-mode: batch # ==================================================== # ============= 具体业务Kafka定义======================= # ==================================================== biz1: topic: topic1 consumer: group: group1 biz2: topic: topic2 consumer: group: group2 # 分区格式示例:0 | 0,1,2 | 0-3 partitions: 0消费端代码示例
/** * 定义biz1消息接收者 * 自动模式(无需手动ack): * 1. listener.type=batch * 2. ack-mode=batch * * @param messages * @kafka.topic topic1 * @kafka.group group1 */ @KafkaListener( id = "biz1-${spring.kafka.biz1.consumer.group}", groupId = "${spring.kafka.biz1.consumer.group}", topics = "${spring.kafka.biz1.topic}") public void biz1Consumer(List<String> messages) { log.info("[biz1Consumer] RECV MSG COUNT: {}", messages.size()); log.info("[biz1Consumer] RECV MSG[0]: {}", messages.get(0)); } /** * 定义biz2消息接收者 * 自动模式(无需手动ack): * 1. listener.type=batch * 2. ack-mode=batch * * @param messages * @kafka.topic topic2 * @kafka.group group2 */ @KafkaListener( id = "biz2-${spring.kafka.biz2.consumer.group}", groupId = "${spring.kafka.biz2.consumer.group}", //消费指定分区 topicPartitions = { @TopicPartition(topic = "${spring.kafka.biz2.topic}", partitions = "${spring.kafka.biz2.consumer.partitions}") }) public void biz2Consumer(List<Message> messages) { log.info("[biz2Consumer] RECV MSG COUNT: {}", messages.size()); log.info("[biz2Consumer] RECV MSG[0]: {}", messages.get(0)); } 2.2 批量消费 - 手动确认需注意如下对应配置:
# 禁用自动提交(按周期)已消费offset spring.kafka.consumer.enable-auto-commit: false # 批量消费的单次最大消费记录数 spring.kafka.consumer.max-poll-reocrds: 50 # listener类型为批量batch类型(默认为single单条消费模式) spring.kafka.listener.type: batch # offset提交模式为batch(不可使用record - 启动报错) spring.kafka.listener.ack-mode: manual
配置示例如下
spring: kafka: # 逗号分隔的集群broker列表 bootstrap-servers: localhost:9092 # ==================================================== # ================== 消费者配置 ======================== # ==================================================== consumer: # 禁用自动提交(按周期)已消费offset enable-auto-commit: false # 单次poll()调用返回的记录数 max-poll-records: 50 # ==================================================== # ============= 消费者监听器(及线程池)配置 ============== # ==================================================== listener: # listener类型 # single | batch type: batch # 已消费offset提交模式(仅在enable-auto-commit=false时才需明确指定) # 单记录 | 批量 | 超时 | 超过消费数量 | 超时或超过数量 | 手动提交(ack)后同BATCH | 手动立即提交 # RECORD | BATCH | TIME | COUNT | COUNT_TIME | MANUAL | MANUAL_IMMEDIATE # https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets ack-mode: manual # ==================================================== # ============= 具体业务Kafka定义======================= # ==================================================== biz1: topic: topic1 consumer: group: group1 biz2: topic: topic2 consumer: group: group2 partitions: 0消费端代码示例
/** * 定义biz1消息接收者 * 手动模式(需手动ack): * 1. listener.type=batch * 2. ack-mode=manual * * @param messages * @kafka.topic topic1 * @kafka.group group1 */ @KafkaListener( id = "biz1-${spring.kafka.biz1.consumer.group}", groupId = "${spring.kafka.biz1.consumer.group}", //仅在多partition单个消费者时,用于多线程消费消息(concurrency <= partition数量) //当存在多个消费者时,即便设置concurrency > 1也仅有唯一消费线程生效 concurrency = "${spring.kafka.biz1.consumer.concurrency}", topics = "${spring.kafka.biz1.topic}") public void biz1Consumer(List<String> messages, Acknowledgment ack) { log.info("[biz1Consumer] RECV MSG COUNT: {}", messages.size()); log.info("[biz1Consumer] RECV MSG[0]: {}", messages.get(0)); //确认单当前消息(及之前的消息)offset均已被消费完成 ack.acknowledge(); //拒绝消息列表中指定index(发生错误的消息index)对应的消息(此方法仅适用于listener.type=batch), //当前指定index之前的消息会被成功提交, //当前poll查询出的剩余消息记录(包括当前指定的index)均被抛弃, //且当前消费线程在阻塞指定sleep(如下3000毫秒)后重新调用poll获取待消费消息(包括当前index及之前poll抛弃的消息) //如下即确认当前list中前5条消息(0-4),抛弃当前list中后续消息,3秒后再次poll查询未消费消息 //ack.nack(5, 3000); } /** * 定义biz2消息接收者 * 手动模式(需手动ack): * 1. listener.type=batch * 2. ack-mode=manual * * @param messages * @kafka.topic topic2 * @kafka.group group2 */ @KafkaListener( id = "biz2-${spring.kafka.biz2.consumer.group}", groupId = "${spring.kafka.biz2.consumer.group}", //消费指定分区 topicPartitions = { @TopicPartition(topic = "${spring.kafka.biz2.topic}", partitions = "${spring.kafka.biz2.consumer.partitions}") }) public void biz2Consumer(List<Message> messages, Acknowledgment ack) { log.info("[biz2Consumer] RECV MSG COUNT: {}", messages.size()); log.info("[biz2Consumer] RECV MSG[0]: {}", messages.get(0)); //确认单当前消息(及之前的消息)offset均已被消费完成 ack.acknowledge(); } 3. 手动模式下的acknowledge和nack方法在手动确认模式下,除了支持ack.acknowledge()方法用于确认单条记录(对应record模式)或者批次记录(对应batch模式), 还支持nack方法用于拒绝消息,关于acknowledge和nack方法的详细使用见下表:
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。 |