irpas技术客

【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议保存)_石臻臻的杂货铺

irpas 2258

超强!!! Kafka学习大全!!!(快收藏)

文章目录 1.TopicCommand1.1.Topic创建1.2.删除Topic1.3.Topic分区扩容1.4.查询Topic描述5.查询Topic列表 2.ConfigCommand2.1 查询配置Topic配置查询其他配置/clients/users/brokers/broker-loggers 的查询查询kafka版本信息 2.2 增删改 配置 `--alter`Topic添加/修改动态配置Topic删除动态配置添加/删除配置同时执行其他配置同理,只需要类型改下`--entity-type` 默认配置 3.副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions4.Topic的发送kafka-console-producer.sh5. Topic的消费kafka-console-consumer.sh6.kafka-leader-election Leader重新选举7. 持续批量推送消息kafka-verifiable-producer.sh8. 持续批量拉取消息kafka-verifiable-consumer9.生产者压力测试kafka-producer-perf-test.sh10.消费者压力测试kafka-consumer-perf-test.sh11.删除指定分区的消息kafka-delete-records.sh12. 查看Broker磁盘信息kafka-log-dirs.sh12. 消费者组管理 kafka-consumer-groups.sh1. 查看消费者列表`--list`2. 查看消费者组详情`--describe`3. 删除消费者组`--delete`4. 重置消费组的偏移量 `--reset-offsets`5. 删除偏移量`delete-offsets` 13.查看日志文件 kafka-dump-log.sh附件More

日常运维 问题排查 怎么能够少了滴滴开源的 滴滴开源LogiKM一站式Kafka监控与管控平台

本文所有命令,博主均全部操作验证过,保证准确性; 非复制粘贴拼凑文章; 如果想了解更多工具命令,可在评论区留下评论,博主会择期加上;

以下大部分运维操作,都可以使用 LogI-Kafka-Manager 在平台上可视化操作;

1.TopicCommand 1.1.Topic创建

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test


相关可选参数

参数描述例子--bootstrap-server 指定kafka服务指定连接到的kafka服务; 如果有这个参数,则 --zookeeper可以不需要–bootstrap-server localhost:9092--zookeeper弃用, 通过zk的连接方式连接到kafka集群;–zookeeper localhost:2181 或者localhost:2181/kafka--replication-factor副本数量,注意不能大于broker数量;如果不提供,则会用集群中默认配置–replication-factor 3--partitions分区数量,当创建或者修改topic的时候,用这个来指定分区数;如果创建的时候没有提供参数,则用集群中默认值; 注意如果是修改的时候,分区比之前小会有问题–partitions 3--replica-assignment副本分区分配方式;创建topic的时候可以自己指定副本分配情况;--replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表示副本--config<String: name=value>用来设置topic级别的配置以覆盖默认配置;只在–create 和–bootstrap-server 同时使用时候生效; 可以配置的参数列表请看文末附件例如覆盖两个配置 --config retention.bytes=123455 --config retention.ms=600001--command-config <String: command 文件路径>用来配置客户端Admin Client启动配置,只在–bootstrap-server 同时使用时候生效;例如:设置请求的超时时间 --command-config config/producer.proterties; 然后在文件中配置 request.timeout.ms=300000
1.2.删除Topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test


支持正则表达式匹配Topic来进行删除,只需要将topic 用双引号包裹起来 例如: 删除以create_topic_byhand_zk为开头的topic;

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic “create_topic_byhand_zk.*” .表示任意匹配除换行符 \n 之外的任何单字符。要匹配 . ,请使用 . 。 ·*·:匹配前面的子表达式零次或多次。要匹配 * 字符,请使用 *。 .* : 任意字符

删除任意Topic (慎用)

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic “.*?”

更多的用法请参考正则表达式

相关配置

配置描述默认file.delete.delay.mstopic删除被标记为–delete文件之后延迟多长时间删除正在的Log文件60000delete.topic.enabletrue是否能够删除topic
1.3.Topic分区扩容

zk方式(不推荐)

>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

kafka版本 >= 2.2 支持下面方式(推荐)

单个Topic扩容

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4

批量扩容 (将所有正则表达式匹配到的Topic分区扩容到4个)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4

".*?" 正则表达式的意思是匹配所有; 您可按需匹配

PS: 当某个Topic的分区少于指定的分区数时候,他会抛出异常;但是不会影响其他Topic正常进行;


相关可选参数

参数描述例子--replica-assignment副本分区分配方式;创建topic的时候可以自己指定副本分配情况;--replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表示副本

PS: 虽然这里配置的是全部的分区副本分配配置,但是正在生效的是新增的分区; 比如: 以前3分区1副本是这样的

Broker-1Broker-2Broker-3Broker-4012

现在新增一个分区,--replica-assignment 2,1,3,4 ; 看这个意思好像是把0,1号分区互相换个Broker

Broker-1Broker-2Broker-3Broker-41023

但是实际上不会这样做,Controller在处理的时候会把前面3个截掉; 只取新增的分区分配方式,原来的还是不会变

Broker-1Broker-2Broker-3Broker-40123
1.4.查询Topic描述

1.查询单个Topic

sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal

2.批量查询Topic(正则表达式匹配,下面是查询所有Topic)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server xxxx:9092 --describe --exclude-internal

支持正则表达式匹配Topic,只需要将topic 用双引号包裹起来


相关可选参数

参数描述例子--bootstrap-server 指定kafka服务指定连接到的kafka服务; 如果有这个参数,则 --zookeeper可以不需要–bootstrap-server localhost:9092--at-min-isr-partitions查询的时候省略一些计数和配置信息--at-min-isr-partitions--exclude-internal排除kafka内部topic,比如__consumer_offsets-*--exclude-internal--topics-with-overrides仅显示已覆盖配置的主题,也就是单独针对Topic设置的配置覆盖默认配置;不展示分区信息--topics-with-overrides
5.查询Topic列表

1.查询所有Topic列表

sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal

2.查询匹配Topic列表(正则表达式)

查询test_create_开头的所有Topic列表 sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"


相关可选参数

参数描述例子--exclude-internal排除kafka内部topic,比如__consumer_offsets-*--exclude-internal--topic可以正则表达式进行匹配,展示topic名称--topic
2.ConfigCommand

Config相关操作; 动态配置可以覆盖默认的静态配置;

2.1 查询配置 Topic配置查询

展示关于Topic的动静态配置

1.查询单个Topic配置(只列举动态配置)

sh bin/kafka-configs.sh --describe --bootstrap-server xxxxx:9092 --topic test_create_topic 或者 sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics --entity-name test_create_topic

2.查询所有Topic配置(包括内部Topic)(只列举动态配置)

sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics

3.查询Topic的详细配置(动态+静态)

只需要加上一个参数--all

其他配置/clients/users/brokers/broker-loggers 的查询

同理 ;只需要将--entity-type 改成对应的类型就行了 (topics/clients/users/brokers/broker-loggers)

查询kafka版本信息

sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version

所有可配置的动态配置 请看最后面的 附件 部分

2.2 增删改 配置 --alter

–alter

删除配置: --delete-config k1=v1,k2=v2 添加/修改配置: --add-config k1,k2 选择类型: --entity-type (topics/clients/users/brokers/broker- loggers) 类型名称: --entity-name

Topic添加/修改动态配置

--add-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999

Topic删除动态配置

--delete-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms

添加/删除配置同时执行

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type brokers --entity-default --add-config log.segment.bytes=788888888 --delete-config log.retention.ms

其他配置同理,只需要类型改下--entity-type

类型有: (topics/clients/users/brokers/broker- loggers)

哪些配置可以修改 请看最后面的附件:ConfigCommand 的一些可选配置

默认配置

配置默认 --entity-default

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9090 --alter --entity-type brokers --entity-default --add-config log.segment.bytes=88888888

动态配置的默认配置是使用了节点 <defalut>;

该图转自https://·/lizherui/p/12271285.html

优先级 指定动态配置>默认动态配置>静态配置

3.副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions

请戳 【kafka运维】副本扩缩容、数据迁移、副本重分配、副本跨路径迁移

kafka实战】分区重分配可能出现的问题和排查问题思路(生产环境实战,干货!!!非常干!!!建议收藏)

4.Topic的发送kafka-console-producer.sh

4.1 生产无key消息

## 生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

4.2 生产有key消息 加上属性--property parse.key=true

## 生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.key=true

默认消息key与消息value间使用“Tab键”进行分隔,所以消息key以及value中切勿使用转义字符(\t)


可选参数

参数值类型说明有效值–bootstrap-serverString要连接的服务器必需(除非指定–broker-list)如:host1:prot1,host2:prot2–topicString(必需)接收消息的主题名称–batch-sizeInteger单个批处理中发送的消息数200(默认值)–compression-codecString压缩编解码器none、gzip(默认值)snappy、lz4、zstd–max-block-msLong在发送请求期间,生产者将阻止的最长时间60000(默认值)–max-memory-bytesLong生产者用来缓冲等待发送到服务器的总内存33554432(默认值)–max-partition-memory-bytesLong为分区分配的缓冲区大小16384–message-send-max-retriesInteger最大的重试发送次数3–metadata-expiry-msLong强制更新元数据的时间阈值(ms)300000–producer-propertyString将自定义属性传递给生成器的机制如:key=value–producer.configString生产者配置属性文件[–producer-property]优先于此配置 配置文件完整路径–propertyString自定义消息读取器parse.key=true/false key.separator=<key.separator>ignore.error=true/false–request-required-acksString生产者请求的确认方式0、1(默认值)、all–request-timeout-msInteger生产者请求的确认超时时间1500(默认值)–retry-backoff-msInteger生产者重试前,刷新元数据的等待时间阈值100(默认值)–socket-buffer-sizeIntegerTCP接收缓冲大小102400(默认值)–timeoutInteger消息排队异步等待处理的时间阈值1000(默认值)–sync同步发送消息–version显示 Kafka 版本不配合其他参数时,显示为本地Kafka版本–help打印帮助信息
5. Topic的消费kafka-console-consumer.sh

1. 新客户端从头消费--from-beginning (注意这里是新客户端,如果之前已经消费过了是不会从头消费的) 下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2. 正则表达式匹配topic进行消费--whitelist 消费所有的topic

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’

消费所有的topic,并且还从头消费

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’ --from-beginning

3.显示key进行消费--property print.key=true

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true

4. 指定分区消费--partition 指定起始偏移量消费--offset

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100

5. 给客户端命名--group

注意给客户端命名之后,如果之前有过消费,那么--from-beginning就不会再从头消费了

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group

6. 添加客户端属性--consumer-property

这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性--group test-group 那肯定不行

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group

7. 添加客户端属性--consumer.config

跟--consumer-property 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面, --consumer-property 的优先级大于 --consumer.config

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties


参数描述例子--group指定消费者所属组的ID--topic被消费的topic--partition指定分区 ;除非指定–offset,否则从分区结束(latest)开始消费--partition 0--offset执行消费的起始offset位置 ;默认值: latest; /latest /earliest /偏移量--offset 10--whitelist正则表达式匹配topic;--topic就不用指定了; 匹配到的所有topic都会消费; 当然用了这个参数,--partition --offset等就不能使用了--consumer-property将用户定义的属性以key=value的形式传递给使用者--consumer-propertygroup.id=test-consumer-group--consumer.config消费者配置属性文件请注意,[consumer-property]优先于此配置--consumer.config config/consumer.properties--property初始化消息格式化程序的属性print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator=<key.separator> 、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer>--from-beginning从存在的最早消息开始,而不是从最新消息开始,注意如果配置了客户端名称并且之前消费过,那就不会从头消费了--max-messages消费的最大数据量,若不指定,则持续消费下去--max-messages 100--skip-message-on-error如果处理消息时出错,请跳过它而不是暂停--isolation-level设置为read_committed以过滤掉未提交的事务性消息,设置为read_uncommitted以读取所有消息,默认值:read_uncommitted--formatterkafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter
6.kafka-leader-election Leader重新选举

6.1 指定Topic指定分区用重新PREFERRED:优先副本策略 进行Leader重选举

> sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0

6.2 所有Topic所有分区用重新PREFERRED:优先副本策略 进行Leader重选举

sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred --all-topic-partitions

6.3 设置配置文件批量指定topic和分区进行Leader重选举

先配置leader-election.json文件

{ "partitions": [ { "topic": "test_create_topic4", "partition": 1 }, { "topic": "test_create_topic4", "partition": 2 } ] } sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred --path-to-json-file config/leader-election.json

相关可选参数

参数描述例子--bootstrap-server 指定kafka服务指定连接到的kafka服务–bootstrap-server localhost:9092--topic指定Topic,此参数跟--all-topic-partitions和path-to-json-file 三者互斥--partition指定分区,跟--topic搭配使用--election-type两个选举策略(PREFERRED:优先副本选举,如果第一个副本不在线的话会失败;UNCLEAN: 策略)--all-topic-partitions所有topic所有分区执行Leader重选举; 此参数跟--topic和path-to-json-file 三者互斥--path-to-json-file配置文件批量选举,此参数跟--topic和all-topic-partitions 三者互斥
7. 持续批量推送消息kafka-verifiable-producer.sh

单次发送100条消息--max-messages 100

一共要推送多少条,默认为-1,-1表示一直推送到进程关闭位置

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --max-messages 100

每秒发送最大吞吐量不超过消息 --throughput 100

推送消息时的吞吐量,单位messages/sec。默认为-1,表示没有限制

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --throughput 100

发送的消息体带前缀--value-prefix

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --value-prefix 666

注意--value-prefix 666必须是整数,发送的消息体的格式是加上一个 点号. 例如: 666.

其他参数: --producer.config CONFIG_FILE 指定producer的配置文件 --acks ACKS 每次推送消息的ack值,默认是-1

8. 持续批量拉取消息kafka-verifiable-consumer

持续消费

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4

单次最大消费10条消息--max-messages 10

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4 --max-messages 10


相关可选参数

参数描述例子--bootstrap-server 指定kafka服务指定连接到的kafka服务;–bootstrap-server localhost:9092--topic指定消费的topic--group-id消费者id;不指定的话每次都是新的组idgroup-instance-id消费组实例ID,唯一值--max-messages单次最大消费的消息数量--enable-autocommit是否开启offset自动提交;默认为false--reset-policy当以前没有消费记录时,选择要拉取offset的策略,可以是earliest, latest,none。默认是earliest--assignment-strategyconsumer分配分区策略,默认是org.apache.kafka.clients.consumer.RangeAssignor--consumer.config指定consumer的配置文件
9.生产者压力测试kafka-producer-perf-test.sh

1. 发送1024条消息--num-records 100并且每条消息大小为1KB--record-size 1024 最大吞吐量每秒10000条--throughput 100

sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024

你可以通过LogIKM查看分区是否增加了对应的数据大小 从LogIKM 可以看到发送了1024条消息; 并且总数据量=1M; 1024条*1024byte = 1M;

2. 用指定消息文件--payload-file发送100条消息最大吞吐量每秒100条--throughput 100

先配置好消息文件batchmessage.txt

然后执行命令 发送的消息会从batchmessage.txt里面随机选择; 注意这里我们没有用参数--payload-delimeter指定分隔符,默认分隔符是\n换行;

bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 1024 --throughput 100 --producer-props bootstrap.servers=localhost:9090 --payload-file config/batchmessage.txt

验证消息,可以通过 LogIKM 查看发送的消息


相关可选参数

参数描述例子--topic指定消费的topic--num-records发送多少条消息--throughput每秒消息最大吞吐量--producer-props生产者配置, k1=v1,k2=v2--producer-props bootstrap.servers= localhost:9092,client.id=test_client--producer.config生产者配置文件--producer.config config/producer.propeties--print-metrics在test结束的时候打印监控信息,默认false--print-metrics true--transactional-id指定事务 ID,测试并发事务的性能时需要,只有在 --transaction-duration-ms > 0 时生效,默认值为 performance-producer-default-transactional-id--transaction-duration-ms指定事务持续的最长时间,超过这段时间后就会调用 commitTransaction 来提交事务,只有指定了 > 0 的值才会开启事务,默认值为 0--record-size一条消息的大小byte; 和 --payload-file 两个中必须指定一个,但不能同时指定--payload-file指定消息的来源文件,只支持 UTF-8 编码的文本文件,文件的消息分隔符通过 --payload-delimeter指定,默认是用换行\nl来分割的,和 --record-size 两个中必须指定一个,但不能同时指定 ; 如果提供的消息--payload-delimeter如果通过 --payload-file 指定了从文件中获取消息内容,那么这个参数的意义是指定文件的消息分隔符,默认值为 \n,即文件的每一行视为一条消息;如果未指定--payload-file则此参数不生效;发送消息的时候是随机送文件里面选择消息发送的;
10.消费者压力测试kafka-consumer-perf-test.sh

消费100条消息--messages 100

sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server localhost:9090 --messages 100


相关可选参数

参数描述例子--bootstrap-server--consumer.config消费者配置文件--date-format结果打印出来的时间格式化默认:yyyy-MM-dd HH:mm:ss:SSS--fetch-size单次请求获取数据的大小默认1048576--topic指定消费的topic--from-latest--group消费组ID--hide-header如果设置了,则不打印header信息--messages需要消费的数量--num-fetch-threadsfeth 数据的线程数(废弃无效)默认:1--print-metrics结束的时候打印监控数据--show-detailed-stats如果设置,则按照--report_interval配置的方式报告每个报告间隔的统计信息--threads消费线程数;(废弃无效)默认 10--reporting-interval打印进度信息的时间间隔(以毫秒为单位)
11.删除指定分区的消息kafka-delete-records.sh

删除指定topic的某个分区的消息删除至offset为1024

先配置json文件offset-json-file.json

{"partitions": [{"topic": "test1", "partition": 0, "offset": 1024}], "version":1 }

在执行命令

sh bin/kafka-delete-records.sh --bootstrap-server 172.23.250.249:9090 --offset-json-file config/offset-json-file.json

验证 通过 LogIKM 查看发送的消息

从这里可以看出来,配置"offset": 1024 的意思是从最开始的地方删除消息到 1024的offset; 是从最前面开始删除的

12. 查看Broker磁盘信息kafka-log-dirs.sh

查询指定topic磁盘信息--topic-list topic1,topic2

sh bin/kafka-log-dirs.sh --bootstrap-server xxxx:9090 --describe --topic-list test2

查询指定Broker磁盘信息--broker-list 0 broker1,broker2

sh bin/kafka-log-dirs.sh --bootstrap-server xxxxx:9090 --describe --topic-list test2 --broker-list 0

例如我一个3分区3副本的Topic的查出来的信息 logDir Broker中配置的log.dir

{ "version": 1, "brokers": [{ "broker": 0, "logDirs": [{ "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0", "error": null, "partitions": [{ "partition": "test2-1", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-0", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-2", "size": 0, "offsetLag": 0, "isFuture": false }] }] }, { "broker": 1, "logDirs": [{ "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1", "error": null, "partitions": [{ "partition": "test2-1", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-0", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-2", "size": 0, "offsetLag": 0, "isFuture": false }] }] }, { "broker": 2, "logDirs": [{ "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2", "error": null, "partitions": [{ "partition": "test2-1", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-0", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-2", "size": 0, "offsetLag": 0, "isFuture": false }] }] }, { "broker": 3, "logDirs": [{ "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3", "error": null, "partitions": [] }] }] }

如果你觉得通过命令查询磁盘信息比较麻烦,你也可以通过 LogIKM 查看

12. 消费者组管理 kafka-consumer-groups.sh 1. 查看消费者列表--list

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list

先调用MetadataRequest拿到所有在线Broker列表 再给每个Broker发送ListGroupsRequest请求获取 消费者组数据

2. 查看消费者组详情--describe

DescribeGroupsRequest

查看消费组详情--group 或 --all-groups

查看指定消费组详情--group sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group


查看所有消费组详情--all-groups sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups 查看该消费组 消费的所有Topic、及所在分区、最新消费offset、Log最新数据offset、Lag还未消费数量、消费者ID等等信息

查询消费者成员信息--members

所有消费组成员信息 sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090 指定消费组成员信息 sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090

查询消费者状态信息--state

所有消费组状态信息 sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090 指定消费组状态信息 sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090

3. 删除消费者组--delete

DeleteGroupsRequest

删除消费组–delete

删除指定消费组--group sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090 删除所有消费组--all-groups sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090

PS: 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报下面异常

Error: Deletion of some consumer groups failed: * Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty. 4. 重置消费组的偏移量 --reset-offsets

能够执行成功的一个前提是 消费组这会是不可用状态;

下面的示例使用的参数是: --dry-run ;这个参数表示预执行,会打印出来将要处理的结果; 等你想真正执行的时候请换成参数--execute ;

下面示例 重置模式都是 --to-earliest 重置到最早的;

请根据需要参考下面 相关重置Offset的模式 换成其他模式;

重置指定消费组的偏移量 --group

重置指定消费组的所有Topic的偏移量--all-topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic 重置指定消费组的指定Topic的偏移量--topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2

重置所有消费组的偏移量 --all-group

重置所有消费组的所有Topic的偏移量--all-topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic 重置所有消费组中指定Topic的偏移量--topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2

--reset-offsets 后面需要接重置的模式

相关重置Offset的模式

参数描述例子--to-earliest :重置offset到最开始的那条offset(找到还未被删除最早的那个offset)--to-current:直接重置offset到当前的offset,也就是LOE--to-latest:重置到最后一个offset--to-datetime:重置到指定时间的offset;格式为:YYYY-MM-DDTHH:mm:SS.sss;--to-datetime "2021-6-26T00:00:00.000"--to-offset重置到指定的offset,但是通常情况下,匹配到多个分区,这里是将匹配到的所有分区都重置到这一个值; 如果 1.目标最大offset<--to-offset, 这个时候重置为目标最大offset;2.目标最小offset>--to-offset ,则重置为最小; 3.否则的话才会重置为--to-offset的目标值; 一般不用这个--to-offset 3465 --shift-by按照偏移量增加或者减少多少个offset;正的为往前增加;负的往后退;当然这里也是匹配所有的;--shift-by 100 、--shift-by -100--from-file根据CVS文档来重置; 这里下面单独讲解

--from-file着重讲解一下

上面其他的一些模式重置的都是匹配到的所有分区; 不能够每个分区重置到不同的offset;不过**--from-file**可以让我们更灵活一点;

先配置cvs文档 格式为: Topic:分区号: 重置目标偏移量test2,0,100 test2,1,200 test2,2,300 执行命令

sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv

5. 删除偏移量delete-offsets

能够执行成功的一个前提是 消费组这会是不可用状态;

偏移量被删除了之后,Consumer Group下次启动的时候,会从头消费;

sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2


相关可选参数

参数描述例子--bootstrap-server指定连接到的kafka服务;–bootstrap-server localhost:9092--list列出所有消费组名称--list--describe查询消费者描述信息--describe--group指定消费组--all-groups指定所有消费组--members查询消费组的成员信息--state查询消费者的状态信息--offsets在查询消费组描述信息的时候,这个参数会列出消息的偏移量信息; 默认就会有这个参数的;dry-run重置偏移量的时候,使用这个参数可以让你预先看到重置情况,这个时候还没有真正的执行,真正执行换成--excute;默认为dry-run--excute真正的执行重置偏移量的操作;--to-earliest将offset重置到最早to-latest将offset重置到最近
13.查看日志文件 kafka-dump-log.sh 参数描述例子--deep-iteration--files <String: file1, file2, ...>必需; 读取的日志文件–files 0000009000.log--key-decoder-class如果设置,则用于反序列化键。这类应实现kafka.serializer。解码器特性。自定义jar应该是在kafka/libs目录中提供--max-message-size最大的数据量,默认:5242880--offsets-decoderif set, log data will be parsed as offset data from the __consumer_offsets topic.--print-data-log打印内容--transaction-log-decoderif set, log data will be parsed as transaction metadata from the __transaction_state topic--value-decoder-class [String]if set, used to deserialize the messages. This class should implement kafka. serializer.Decoder trait. Custom jar should be available in kafka/libs directory. (default: kafka.serializer. StringDecoder)--verify-index-onlyif set, just verify the index log without printing its content.

查询Log文件

sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.log

查询Log文件具体信息 --print-data-log

sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.log --print-data-log

查询index文件具体信息

sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.index 配置项为log.index.size.max.bytes; 来控制创建索引的大小;

查询timeindex文件

sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.timeindex

附件

ConfigCommand 的一些可选配置


Topic相关可选配置

keyvalue示例cleanup.policy清理策略compression.type压缩类型(通常建议在produce端控制)delete.retention.ms压缩日志的保留时间file.delete.delay.mstopic删除被标记为–delete文件之后延迟多长时间删除正在的Log文件60000flush.messages持久化message限制flush.ms持久化频率follower.replication.throttled.replicasflowwer副本限流 格式:分区号:副本follower号,分区号:副本follower号0:1,1:1index.interval.bytesleader.replication.throttled.replicasleader副本限流 格式:分区号:副本Leader号0:0max.compaction.lag.msmax.message.bytes最大的batch的message大小message.downconversion.enablemessage是否向下兼容message.format.versionmessage格式版本message.timestamp.difference.max.msmessage.timestamp.typemin.cleanable.dirty.ratiomin.compaction.lag.msmin.insync.replicas最小的ISRpreallocateretention.bytes日志保留大小(通常按照时间限制)retention.ms日志保留时间segment.bytessegment的大小限制segment.index.bytessegment.jitter.mssegment.mssegment的切割时间unclean.leader.election.enable是否允许非同步副本选主

Broker相关可选配置

keyvalue示例advertised.listenersbackground.threadscompression.typefollower.replication.throttled.rateleader.replication.throttled.ratelistener.security.protocol.maplistenerslog.cleaner.backoff.mslog.cleaner.dedupe.buffer.sizelog.cleaner.delete.retention.mslog.cleaner.io.buffer.load.factorlog.cleaner.io.buffer.sizelog.cleaner.io.max.bytes.per.secondlog.cleaner.max.compaction.lag.mslog.cleaner.min.cleanable.ratiolog.cleaner.min.compaction.lag.mslog.cleaner.threadslog.cleanup.policylog.flush.interval.messageslog.flush.interval.mslog.index.interval.byteslog.index.size.max.byteslog.message.downconversion.enablelog.message.timestamp.difference.max.mslog.message.timestamp.typelog.preallocatelog.retention.byteslog.retention.mslog.roll.jitter.mslog.roll.mslog.segment.byteslog.segment.delete.delay.msmax.connectionsmax.connections.per.ipmax.connections.per.ip.overridesmessage.max.bytesmetric.reportersmin.insync.replicasnum.io.threadsnum.network.threadsnum.recovery.threads.per.data.dirnum.replica.fetchersprincipal.builder.classreplica.alter.log.dirs.io.max.bytes.per.secondsasl.enabled.mechanismssasl.jaas.configsasl.kerberos.kinit.cmdsasl.kerberos.min.time.before.reloginsasl.kerberos.principal.to.local.rulessasl.kerberos.service.namesasl.kerberos.ticket.renew.jittersasl.kerberos.ticket.renew.window.factorsasl.login.refresh.buffer.secondssasl.login.refresh.min.period.secondssasl.login.refresh.window.factorsasl.login.refresh.window.jittersasl.mechanism.inter.broker.protocolssl.cipher.suitesssl.client.authssl.enabled.protocolsssl.endpoint.identification.algorithmssl.key.passwordssl.keymanager.algorithmssl.keystore.locationssl.keystore.passwordssl.keystore.typessl.protocolssl.providerssl.secure.random.implementationssl.trustmanager.algorithmssl.truststore.locationssl.truststore.passwordssl.truststore.typeunclean.leader.election.enable

Users相关可选配置

keyvalue示例SCRAM-SHA-256SCRAM-SHA-512consumer_byte_rate针对消费者user进行限流producer_byte_rate针对生产者进行限流request_percentage请求百分比

clients相关可选配置

keyvalue示例consumer_byte_rateproducer_byte_raterequest_percentage

以上大部分运维操作,都可以使用 LogI-Kafka-Manager 在平台上可视化操作;


欢迎 Star和 共建由 滴滴开源的kafka的管理平台,非常优秀非常好用的一款kafka管理平台 满足所有开发运维日常需求

滴滴开源Logi-KafkaManager 一站式Kafka监控与管控平台

More

Kafka专栏持续更新中…(源码、原理、实战、运维、视频、面试视频)


【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)_石臻臻的杂货铺-CSDN博客

【kafka实战】分区重分配可能出现的问题和排查问题思路(生产环境实战,干货!!!非常干!!!建议收藏)

【kafka异常】kafka 常见异常处理方案(持续更新! 建议收藏)

【kafka运维】分区从分配、数据迁移、副本扩缩容 (附教学视频)

【kafka源码】ReassignPartitionsCommand源码分析(副本扩缩、数据迁移、副本重分配、副本跨路径迁移

【kafka】点击更多…


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #本文所有命令 #博主均全部操作验证过 #保证准确性 #非复制粘贴拼凑文章 #如果想了解更多工具命令 #可在评论区留下评论