irpas技术客

Day542&543&544&545&Day546.kafka基础_阿昌喜欢吃黄桃

大大的周 515

kafka 一、基础架构


二、Kafka 快速入门 1、集群规划

2、集群部署

下载地址

1 )解压安装包:

tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

2 )修改解压后的文件名称:

mv kafka_2.12-3.0.0/ kafka

3 )进入到/opt/module/kafka 目录,修改配置文件

cd config/ vim server.properties

输入以下内容:


#broker 的全局唯一编号,不能重复,只能是数字。 broker.id=0 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以 配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/opt/module/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个 topic 创建时的副本数,默认时 1 个副本 offsets.topic.replication.factor=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个 segment 文件的大小,默认最大 1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 #配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理) zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka 3、集群 启停脚本

在/home/atguigu/bin 目录下创建文件 kf.sh 脚本文件

vim kf.sh

脚本如下:

#! /bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 hadoop104 do echo " --------启动 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties" done };; "stop"){ for i in hadoop102 hadoop103 hadoop104 do echo " --------停止 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh " done };; esac

添加执行权限

chmod +x kf.sh

启动集群命令

kf.sh start

停止集群命令

kf.sh stop


3、Kafka 命令行操作

4、生产者命令 行操作


三、Kafka 生产者 1、生产者 消息发送流程 ①发送原理

在消息发送的过程中,涉及到了 两个线程 ——main 线程和Sender 线程。

在 main 线程中创建了 一个 双端列队列 RecordAccumulator。

main线程将消息发送给RecordAccumulator,Sender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

②生产者重要参数列表


2、异步送 发送 API ①普通异步发送

需求:创建 Kafka生产者,采用异步的方式发送到 Kafka Broker

导入依赖

<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> </dependencies>

编写不带回调函数的 API代码:

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class CustomProducer { public static void main(String[] args) throws InterruptedException { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value 序列化(必须):key.serializer,value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i)); } // 5. 关闭资源 kafkaProducer.close(); } } ②带回调函数的 异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元 数据信息(RecordMetadata)和异常信息(Exception)

如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*; import java.util.Properties; public class CustomProducerCallback { public static void main(String[] args) throws InterruptedException { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value 序列化(必须):key.serializer,value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { // 添加回调 kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i), new Callback() { // 该方法在 Producer 收到 ack 时调用,为异步调用 @Override public void onCompletion(RecordMetadata metadata,Exception exception) { if (exception == null) { // 没有异常,输出信息到控制台 System.out.println(" 主 题 : " + metadata.topic() + "->" + "分区:" + metadata.partition()); } else { // 出现异常打印 exception.printStackTrace(); } } }); // 延迟一会会看到数据发往不同分区 Thread.sleep(2); } // 5. 关闭资源 kafkaProducer.close(); } } ③同步发送 API

只需在异步发送的基础上,再调用一下 get()方法即可。


四、生产者分区 1、分区好处

2、生产者发送消息的分区策略 ①默认的分区器 DefaultPartitioner

②自定义分区器

实现步骤:

(1)定义类实现 Partitioner 接口。(2)重写 partition()方法。 import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * 1. 实现接口 Partitioner * 2. 实现 3 个方法:partition,close,configure * 3. 编写 partition 方法,返回分区号 */ public class MyPartitioner implements Partitioner { /** * 返回信息对应的分区 * @param topic 主题 * @param key 消息的 key * @param keyBytes 消息的 key 序列化后的字节数组 * @param value 消息的 value * @param valueBytes 消息的 value 序列化后的字节数组 * @param cluster 集群元数据可以查看分区信息 * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 获取消息 String msgValue = value.toString(); // 创建 partition int partition; // 判断消息是否包含 atguigu if (msgValue.contains("atguigu")){ partition = 0; }else { partition = 1; } // 返回分区号 return partition; } // 关闭资源 @Override public void close() { } // 配置方法 @Override public void configure(Map<String, ?> configs) { } }

使用分区器的方法,在生产者的配置中添加分区器参数。

import org.apache.kafka.clients.producer.*; import java.util.Properties; public class CustomProducerCallbackPartitions { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102 :9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 添加自定义分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atgui gu.kafka.producer.MyPartitioner"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e == null){ System.out.println(" 主 题 : " + metadata.topic() + "->" + "分区:" + metadata.partition() ); }else { e.printStackTrace(); } } }); } kafkaProducer.close(); } }
五、生产者 如何提高吞吐量

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class CustomProducerParameters { public static void main(String[] args) throwsInterruptedException { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value 序列化(必须):key.serializer,value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // batch.size:批次大小,默认 16K properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // linger.ms:等待时间,默认 0 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i)); } // 5. 关闭资源 kafkaProducer.close(); } }
六、数据可靠性

回顾发送流程:

ack 应答原理: ACK应答级别:

在配置properties中指定使用对应的ack级别


七、数据去重 1、数据传递语义

2、幂等性 ①幂等性原理

②如何使用幂等性

开启参数 enable.idempotence 默认为 true,false关闭。

3、生产者事务 ①Kafka事务原理

②Kafka的事务一共有如下 5个 API // 1 初始化事务 void initTransactions(); // 2 开启事务 void beginTransaction() throws ProducerFencedException; // 3 在事务内提交已经消费的偏移量(主要用于消费者) void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; // 4 提交事务 void commitTransaction() throws ProducerFencedException; // 5 放弃事务(类似于回滚事务的操作) void abortTransaction() throws ProducerFencedException; ③单个 Producer,使用事务保证消息的仅一次发送 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class CustomProducerTransactions { public static void main(String[] args) throws InterruptedException { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value 序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置事务 id(必须),事务 id 任意起名 properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0"); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 初始化事务 kafkaProducer.initTransactions(); // 开启事务 kafkaProducer.beginTransaction(); try { // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { // 发送消息 kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i)); } // int i = 1 / 0; // 提交事务 kafkaProducer.commitTransaction(); } catch (Exception e) { // 终止事务 kafkaProducer.abortTransaction(); } finally { // 5. 关闭资源 kafkaProducer.close(); } } }
八、数据有序

九、数据乱序


十、Broker 工作流程 1、Zookeeper 存储的 Kafka 信息

启动 Zookeeper 客户端:

bin/zkCli.sh

通过 ls命令可以查看 kafka 相关信息:

[zk: localhost:2181(CONNECTED) 2] ls /kafka

2、Kafka Broker总体工作流程

3、Broker 重要参数


4、生产经验 —— 节点服役和退役 服役新节点

修改 haodoop105中 kafka的 broker.id为 3。保证唯一即可

执行 负载均衡 操作

创建一个要均衡的主题:

vim topics-to-move.json { "topics": [ {"topic": "first"} ], "version": 1 }

生成一个负载均衡的计划:

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"first","partition":0,"replic as":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","par tition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"to pic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any"," any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"first","partition":0,"replic as":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","par tition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"to pic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any"," any","any"]}]}

创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3中):

{ "version":1, "partitions":[ {"topic":"first", "partition":0, "replicas":[2,3,0], "log_dirs":["any","any","any"] }, {"topic":"first", "partition":1, "replicas":[3,0,1], "log_dirs":["any","any","any"] }, {"topic":"first", "partition":2, "replicas":[0,1,2], "log_dirs":["any","any","any"] }] }

执行副本存储计划:

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

验证副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify Status of partition reassignment: Reassignment of partition first-0 is complete. Reassignment of partition first-1 is complete. Reassignment of partition first-2 is complete. Clearing broker-level throttles on brokers 0,1,2,3 Clearing topic-level throttles on topic first
退役旧节点

执行负载均衡操作:

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。

创建一个要均衡的主题:

vim topics-to-move.json { "topics": [ {"topic": "first"} ], "version": 1 }

创建执行计划:

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,2,3],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}

创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)。

vim increase-replication-factor.json {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}

执行副本存储计划:

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

验证副本存储计划:

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify Status of partition reassignment: Reassignment of partition first-0 is complete. Reassignment of partition first-1 is complete. Reassignment of partition first-2 is complete. Clearing broker-level throttles on brokers 0,1,2,3 Clearing topic-level throttles on topic first 执行停止命令 bin/kafka-server-stop.sh
5、Kafka副本 ①副本基本信息

ISR,和Leader通讯正常的Follower集合 OSR,和Leader通讯不正常的Follower集合


②Leader 选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群 broker的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。

Controller 的信息同步工作是依赖于 Zookeeper的。

③Leader 和 Follower 故障处理

④分区副本分配

如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka 底层如何分配存储副本呢?

⑤生产经验—— 手动 调整 分区 副本 存储

查看分区副本存储情况。

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three ⑥生产经验 ——Leader Partition 负载 平衡

推荐关闭,或设置percentage>20%

⑦生产经验 —— 增加副本因子

{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]} [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute 6、文件存储 ①文件存储机制

②思考:Topic 数据到底存储 在什么位置?

③index 文件和 log 文件详解

7、文件清理策略

8、高效读写数据


十一、Kafka 消费者 1、Kafka 消费方式

2、消费者工作流程 ①消费者总体工作流程

②消费者组原理 消费者组 消费者组初始化流程 消费者组详细消费流程 ③消费者重要参数

3、消费者API ①独立 消费者 案例 (订阅主题) 需求 创建一个独立消费者,消费 first主题中数据。 注意: 在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; public class CustomConsumer { public static void main(String[] args) { // 1.创建消费者的配置对象 Properties properties = new Properties(); // 2.给消费者配置对象添加参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // 配置序列化 必须 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消费者组(组名任意起名) 必须 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); // 创建消费者对象 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); // 注册要消费的主题(可以消费多个主题) ArrayList<String> topics = new ArrayList<>(); topics.add("first"); kafkaConsumer.subscribe(topics); // 拉取数据打印 while (true) { // 设置 1s 中消费一批数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); // 打印消费到的数据 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } } ②独立消费者 案例 (订阅分区)

需求:创建一个独立消费者,消费 first主题 0 号分区的数据。 实现步骤:

import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Properties; public class CustomConsumerPartition { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); // 配置序列化 必须 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消费者组(必须),名字可以任意起 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 消费某个主题的某个分区数据 ArrayList<TopicPartition> topicPartitions = new ArrayList<>(); topicPartitions.add(new TopicPartition("first", 0)); kafkaConsumer.assign(topicPartitions); while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }

测试:

③消费者组案例

需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。 案例实操:

复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者。

import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; public class CustomConsumer1 { public static void main(String[] args) { // 1.创建消费者的配置对象 Properties properties = new Properties(); // 2.给消费者配置对象添加参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); // 配置序列化 必须 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消费者组 必须 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); // 创建消费者对象 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); // 注册主题 ArrayList<String> topics = new ArrayList<>(); topics.add("first"); kafkaConsumer.subscribe(topics); // 拉取数据打印 while (true) { // 设置 1s 中消费一批数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); // 打印消费到的数据 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }

启动代码中的生产者发送消息,在 IDEA 控制台即可看到两个消费者在消费不同分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码 Thread.sleep(2);)。 重新发送到一个全新的主题中,由于默认创建的主题分区数为 1,可以看到只能有一个消费者消费到数据。

4、生产经验 —— 分区的分配

①Range以及再平衡

Range分区策略原理

Range 分区分配策略案例

修改主题 first为 7个分区。bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 7 注意:分区数可以增加,但是不能减少。复制 CustomConsumer 类,创建 CustomConsumer2。这样可以由三个消费者 CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”,同时启动 3 个消费者。 启动 CustomProducer生产者,发送 500条消息,随机发送到不同的分区。 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class CustomProducer { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); for (int i = 0; i < 7; i++) { kafkaProducer.send(new ProducerRecord<>("first", i,"test", "atguigu")); } kafkaProducer.close(); } }

说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

观看 3个消费者分别消费哪些分区的数据。

Range 分区分配再平衡案例

5、RoundRobin 以及再平衡 ①RoundRobin 分区策略原理

②RoundRobin分区分配策略案例

(1)依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin。

// 修改分区分配策略 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

(2)重启 3个消费者,重复发送消息的步骤,观看分区结果。 3 )RoundRobin 分区分配再平衡案例

6、Sticky 以及再平衡


十二、offset 位移 1、offset 的默认维护

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。

key 是group.id+topic+分区号,value 就是当前 offset 的值。

每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。

①消费 offset 案例

②自动提交 offset

消费者交自动提交 offset import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class CustomConsumerAutoOffset { public static void main(String[] args) { // 1. 创建 kafka 消费者配置类 Properties properties = new Properties(); // 2. 添加配置参数 // 添加连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); // 配置序列化 必须 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); // 配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); // 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 提交 offset 的时间周期 1000ms,默认 5s properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); //3. 创建 kafka 消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //4. 设置消费主题 形参是列表 consumer.subscribe(Arrays.asList("first")); //5. 消费数据 while (true){ // 读取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); // 输出消息 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.value()); } } } } ③手动交 提交 offset

交同步提交 offset:

由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交 offset的示例。

import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class CustomConsumerByHandSync { public static void main(String[] args) { // 1. 创建 kafka 消费者配置类 Properties properties = new Properties(); // 2. 添加配置参数 // 添加连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); // 配置序列化 必须 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); // 配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); // 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //3. 创建 kafka 消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //4. 设置消费主题 形参是列表 consumer.subscribe(Arrays.asList("first")); //5. 消费数据 while (true){ // 读取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); // 输出消息 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.value()); } // 同步提交 offset consumer.commitSync(); } } }

异步提交 offset: 虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。

因此吞吐量会受到很大的影响。

因此更多的情况下,会选用异步提交 offset的方式。

以下为异步提交 offset 的示例:

import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Map; import java.util.Properties; public class CustomConsumerByHandAsync { public static void main(String[] args) { // 1. 创建 kafka 消费者配置类 Properties properties = new Properties(); // 2. 添加配置参数 // 添加连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // 配置序列化 必须 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); // 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //3. 创建 Kafka 消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //4. 设置消费主题 形参是列表 consumer.subscribe(Arrays.asList("first")); //5. 消费数据 while (true){ // 读取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); // 输出消息 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.value()); } // 异步提交 offset consumer.commitAsync(); } } } ④指定 Offset 消费

import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.Properties; import java.util.Set; public class CustomConsumerSeek { public static void main(String[] args) { // 0 配置信息 Properties properties = new Properties(); // 连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key value 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2"); // 1 创建一个消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 订阅一个主题 ArrayList<String> topics = new ArrayList<>(); topics.add("first"); kafkaConsumer.subscribe(topics); Set<TopicPartition> assignment= new HashSet<>(); while (assignment.size() == 0) { kafkaConsumer.poll(Duration.ofSeconds(1)); // 获取消费者分区分配信息(有了分区分配信息才能开始消费) assignment = kafkaConsumer.assignment(); } // 遍历所有分区,并指定 offset 从 1700 的位置开始消费 for (TopicPartition tp: assignment) { kafkaConsumer.seek(tp, 1700); } // 3 消费该主题数据 while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord :consumerRecords) { System.out.println(consumerRecord); } } } }

注意:每次执行完,需要修改消费者组名;

⑤指定时间消费

需求:

在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。

例如要求按照时间消费前一天的数据,怎么处理?

import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.*; public class CustomConsumerForTime { public static void main(String[] args) { // 0 配置信息 Properties properties = new Properties(); // 连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key value 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2"); // 1 创建一个消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 订阅一个主题 ArrayList<String> topics = new ArrayList<>(); topics.add("first"); kafkaConsumer.subscribe(topics); Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0) { kafkaConsumer.poll(Duration.ofSeconds(1)); // 获取消费者分区分配信息(有了分区分配信息才能开始消费) assignment = kafkaConsumer.assignment(); } HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>(); // 封装集合存储,每个分区对应一天前的数据 for (TopicPartition topicPartition : assignment) { timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000); } // 获取从 1 天前开始消费的每个分区的 offset Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch); // 遍历每个分区,对每个分区设置消费时间。 for (TopicPartition topicPartition : assignment) { OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition); // 根据时间指定开始消费的位置 if (offsetAndTimestamp != null){ kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset()); } } // 3 消费该主题数据 while (true) { ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } } ⑥漏消费和重复消费

重复消费:已经消费了数据,但是 offset没提交。

漏消费:先提交 offset后消费,有可能会造成数据的漏消费。

2、生产经验 —— 消费者事务

3、生产经验 —— 数据积压 ( 消费者 如何提高吞吐量)

增加消费者的数量增加消费者一次拉取的消息条数增加broker累计消息大小大小


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #解压安装包tar #zxvf #kafka_212300tgz #C