irpas技术客

Kafka基础整理_恒哥~Bingo

网络投稿 2605

Kafka基本概念

? Apache Kafka 是一个分布式发布 - 订阅消息系统和一个强大的消息队列,可以处理大量的数据,并使你能够将消息从一个端点传递到另一个端点。 Kafka 适合离线和在线消息消费。 Kafka 消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka 构建在 Zookeeper 同步服务之上。 它与 Apache Storm 和 Spark 非常好地集成,用于实时流式数据分析。

? Kafka 是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。

?

? 相关术语

Broker

Kafka集群包含多个服务器,每个服务器节点就叫Broker

Topic

Kafka消息分为不同的主题,按主题分开投递和处理

Partition

每个Topic将数据分为1个或N个分区来存储

Producer

消息生产者,负责投递消息

Consumer

消息消费者,负责接收消息

Consumer Group

消费者组,每个消费者属于特定的组

Leader

Kafka集群中会选取一个Broker,负责写数据,然后广播同步给其他Broker,挂掉后会重新选举

Follower

跟随Leader,负责读数据,Leader的数据会同步给Follower

Replica

Broker的副本,当前Broker挂掉后可以代替

Kafka安装和配置

1)下载kafka

https://kafka.apache.org/downloads

本例使用的版本是:kafka_2.11-2.4.1.tgz

2) 解压并修改目录

tar -xvf kafka_2.11-2.4.1.tgz mv kafka_2.11-2.4.1 kafka

3) 配置

修改server.properties关键配置、其他的保持默认:

# 监听本机的9092端口 listeners=PLAINTEXT://:9092 # zk的ip和端口 zookeeper.connect=localhost:2181

4) 启动kafka

需要先启动zookeeper,kafka提供了内置的zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

执行bin目录中的脚本

bin/kafka-server-start.sh config/server.properties

如果出现Not enough space是因为堆内存不足,可以修改kafka-server-start.sh脚本,将"-Xmx1G -Xms1G"改为"-Xmx512M -Xms512M"

需要后台执行可以加入-daemon命令

bin/kafka-server-start.sh -daemon config/server.properties

日志在logs/server.log中查看

5) 停止kafka

bin/kafka-server-stop.sh config/server.properties Kafka的基本使用

执行bin目录中的脚本

1) 创建主题

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test –create 创建topic–zookeeper zk的地址–replication-factor 副本数,默认是1–partitions 分区数–topic 主题名称

2) 查看主题

bin/kafka-topics.sh --list --zookeeper localhost:2181

3) 发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

4) 消费消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning SpringBoot整合Kafka

1) 修改kafka配置

# 允许外部IP连接 listeners=PLAINTEXT://0.0.0.0:9092 # 为外部提供服务的代理地址,就是当前服务的IP advertised.listeners=PLAINTEXT://192.168.7.188:9092

2) 重新启动Kafka

3) 创建SpringBoot项目

4) 添加依赖

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

5) 配置文件

# Kafka集群地址 spring.kafka.bootstrap-servers=192.168.7.188:9092 # 生产者配置 # 重试次数 spring.kafka.producer.retries=0 # 批量大小 spring.kafka.producer.batch-size=16384 # 提交延时 spring.kafka.producer.properties.linger.ms=0 # 生产端缓冲区大小 spring.kafka.producer.buffer-memory = 33554432 # Kafka提供的序列化和反序列化类 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 消费者配置 # 默认的消费组ID spring.kafka.consumer.group-id=0 # 是否自动提交offset spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.auto-commit-interval=100 # 设置获取一次数据的最大值,默认值为1M spring.kafka.consumer.properties.max.partition.fetch.bytes=2097152 # Kafka提供的序列化和反序列化类 spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

参考官网的kafka配置:

https://docs.spring.io/spring-boot/docs/2.0.4.RELEASE/reference/htmlsingle/#common-application-properties

6) 配置类

@Configuration public class KafkaConfig { /** * 新建Topic,主题名、分区数,副本数 */ @Bean public NewTopic newTopic(){ return new NewTopic("hello-topic",1, (short) 1); } }

7) 生产者

@RestController public class KafkaProducerController { @Autowired private KafkaTemplate<String,Object> kafkaTemplate; @RequestMapping("/kafka/sendMessage") public String sendMessage(String msg){ kafkaTemplate.send("hello-topic",msg); return "发送成功:" + msg; } }

8) 消费者

@Component public class KafkaMessageConsumer { @KafkaListener(topics = {"hello-topic"}) public void consumeMessage(ConsumerRecord<?,?> record){ System.out.println("Get Message:" + record.value()); } }

9) 浏览器测试


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Kafka基础整理 #Kafka入门