irpas技术客

Kafka安装与配置_上善若水_厚德载物_kafka安装配置

irpas 6296

1.下载Kafka2.13-3.1.0

最新版为?kafka_2.13-3.1.0.tgz

下载Zookeper

最新版为?zookeeper-3.8.0

2.单机安装zookeper

Kafka依赖于zookeeper,官方承诺将来会移除.

解压文件:

tar zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /opt/ mv /opt/apache-zookeeper-3.8.0-bin/ /opt/zookeeper

在/opt/zookeeper/ 目录下创建数据文件目录和日志文件目录

mkdir /opt/zookeeper/zkData mkdir /opt/zookeeper/zkLog

# 复制一份配置文件并修改

cd /opt/zookeeper/conf/ cp zoo_sample.cfg zoo.cfg vi zoo.cfg # 修改如下内容 dataDir=/opt/zookeeper/zkData dataLogDir=/opt/zookeeper/zkLog

启动

cd /opt/zookeeper/bin/ # 启动zookeeper ./zkServer.sh start # 查看进程是否启动 jps # 查看状态 ./zkServer.sh status # 停止zookeeper ./zkServer.sh stop

3.安装Kafka

解压到指定目录

cd /home $ tar -xzf kafka_2.13-3.1.0.tgz $ cd kafka_2.13-3.1.0

修改config目录下vi server.propertie文件

listeners = PLAINTEXT://192.168.2.40:9092 #多个可用逗号分隔 #zookeeper.connect=server1:2181,server2:2181,server3:2181 zookeeper.connect=localhost:2181

启动命令:

bin/kafka-server-start.sh config/server.properties

此方式可以实时查看日志.

后台启动方式:

./kafka-server-start.sh -daemon ../config/server.properties

查询进程和关闭命令

jps ./kafka-server-stop.sh

登录zookeeper客户端,查看/brokers/ids

cd /opt/zookeeper/bin/ zkCli.sh # 查询结果如下: [zk: localhost:2181(CONNECTED) 0] ls /brokers/ids [0] [zk: localhost:2181(CONNECTED) 1] quit

kafka常见命令

#创建主题 主题名是 quickstart-events $ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 #查询主题 $ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 #主题中写入消息 bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 This is my first event This is my second event #主题中读取消息 bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 This is my first event This is my second event

kafka集群

假如现在有两台服务器192.168.2.40,192.168.2.41

kafka的安装与配置如上,两台服务器唯一不同的地方就是配置文件中的broker.id

?修改config目录下vi server.propertie文件

192.168.2.40

broker.id=0

192.168.2.41

broker.id=1

bin目录启动命令都添加

vi kafka-server-start.sh #添加 export JMX_PORT="9999" if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" export JMX_PORT="9999" fi

?登录zookeeper客户端,查看/brokers/ids

4.可视化工具kafka-eagle

下载:kafka-eaglev2.1.0.tar.gz

解压

cd /home tar -zxvf kafka-eagle-web-2.1.0-bin.tar.gz

在/etc/profile文件中添加环境变量KE_HOME?

vi /etc/profile # 在profile文件中添加 export KE_HOME=/mydata/kafka/kafka-eagle-web-2.0.5 export PATH=$PATH:$KE_HOME/bin # 使修改后的profile文件生效 . /etc/profile

安装MySQL并添加数据库ke,kafka-eagle之后会用到它; 修改配置文件$KE_HOME/conf/system-config.properties,主要是修改Zookeeper的配置和数据库配置,注释掉sqlite配置,改为使用MySQL.

###################################### kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=localhost:2181 ###################################### # kafka eagle webui port ###################################### kafka.eagle.webui.port=8048 ###################################### # kafka mysql jdbc driver address ###################################### kafka.eagle.driver=com.mysql.cj.jdbc.Driver kafka.eagle.url=jdbc:mysql://localhost:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=123456

?启动后会自动使用上面的数据库连接,创建并初始化数据库.名称ke.

# 停止服务 bin/ke.sh stop # 重启服务 bin/ke.sh restart # 查看服务运行状态 bin/ke.sh status # 查看服务状态 bin/ke.sh stats # 动态查看服务输出日志 tail -f /logs/ke_console.out

?? 启动成功可以直接访问,输入账号密码admin:123456,访问地址:http://localhost:8048/

?

注意观察 brokers,topics的数量。brokers为0的话没有连接成功.

可视化工具自然少不了监控,如果你想开启kafka-eagle对Kafka的监控功能的话,需要修改Kafka的启动脚本,暴露JMX的端口.

vi kafka-server-start.sh #添加 export JMX_PORT="9999" if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" export JMX_PORT="9999" fi

kafka集群图示:

?

5.SpringBoot整合Kafka.

在pom.xml中添加

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

?在application.yml中spring节点下添加

spring kafka: bootstrap-servers: 192.168.2.40:9092 producer: # 生产者配置 retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送 batch-size: 16384 #16K buffer-memory: 33554432 #32M acks: 1 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: zhTestGroup # 消费者组 enable-auto-commit: false # 关闭自动提交 auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 # RECORD # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 # BATCH # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交 # TIME # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交 # COUNT # TIME | COUNT 有一个条件满足时提交 # COUNT_TIME # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交 # MANUAL # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种 # MANUAL_IMMEDIATE ack-mode: manual_immediate

生产者:

@RestController public class KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; // 发送消息 @GetMapping("/kafka/normal/{message}") public void sendMessage1(@PathVariable("message") String normalMessage) { kafkaTemplate.send("quickstart-events", normalMessage); } }

?消费者:

@Component public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"quickstart-events"}) public void onMessage1(ConsumerRecord<?, ?> record){ // 消费的哪个topic、partition的消息,打印出消息内容 System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value()); } }

?可以用postman进行测试,观察结果.


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #kafka安装配置 #zxvf #C #optmv