irpas技术客

大数据Hadoop之——Kafka 图形化工具 EFAK(EFAK环境部署)_大数据老司机_kafka图形化工具

大大的周 5891

文章目录 一、概述二、EFAK架构三、EFAK数据采集原理四、安装Kafka1)Kafka下载2)配置环境变量3)创建logs目录4)修改kafka配置5)修改zookeeper配置6)配置Zookeeper myid7)开启Kafka JMX监控8)将kafka目录推送到其它节点9)启动服务 五、安装EFAK1)下载EFAK2)创建数据库2)设置环境变量3)配置4)调整启动参数6)修改 works配置7)将EFAK推送其它节点8)启动 六、简单使用1)Kafka CLI简单使用2)EFAK常用命令

一、概述

EFAK(Eagle For Apache Kafka,以前称为 Kafka Eagle)是一款由国内公司开源的Kafka集群监控系统,可以用来监视kafka集群的broker状态、Topic信息、IO、内存、consumer线程、偏移量等信息,并进行可视化图表展示。独特的KQL还可以通过SQL在线查询kafka中的数据。

源码: https://github.com/smartloli/kafka-eagle/ 下载: http://download.kafka-eagle.org/ 官方文档:https://·mon.InconsistentClusterIdException: The Cluster ID ELIH-KKbRP-NnPnHt4z-lA doesn't match stored clusterId Some(2HC_x7bTR_u2bCxrqw0Otw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:228) at kafka.Kafka$.main(Kafka.scala:109) at kafka.Kafka.main(Kafka.scala)

【解决】在server.properties找到log.dirs配置的路径。将该路径下的meta.properties文件删除,或者编辑meta.properties文件修改里面的cluster.id即可。

8)将kafka目录推送到其它节点 $ scp -r $KAFKA_HOME hadoop-node2:/opt/bigdata/hadoop/server/ $ scp -r $KAFKA_HOME hadoop-node3:/opt/bigdata/hadoop/server/ # 在hadoop-node2和hadoop-node3节点上设置环境变量 $ vi /etc/profile export KAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1 export PATH=$PATH:$KAFKA_HOME/bin $ source /etc/profile # 修改advertised.listeners,值改成对应的hostname或者ip

【温馨提示】修改hadoop-node2上server.properties文件的broker.id,设置为1和2,只要不重复就行,advertised.listeners地址改成对应机器IP。

9)启动服务

启动zookeeper集群之后再启动kafka集群

$ cd $KAFKA_HOME # -daemon后台启动 $ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties # 默认端口2181,可以在配置自定义,这里修改为12181端口 $ netstat -tnlp|grep 12181

启动Kafka

$ cd $KAFKA_HOME # 默认端口9092,这里修改成了19092,可以修改listeners和advertised.listeners $ ./bin/kafka-server-start.sh -daemon ./config/server.properties $ netstat -tnlp|grep 9092 $ jps 五、安装EFAK 1)下载EFAK $ cd /opt/bigdata/hadoop/software $ wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.1.0.tar.gz $ tar -xf kafka-eagle-bin-2.1.0.tar.gz -C /opt/bigdata/hadoop/server/ $ cd /opt/bigdata/hadoop/server/ $ tar -xf 2)创建数据库 $ mysql -uroot -p 123456 create database ke; 2)设置环境变量 $ vi /etc/profile export KE_HOME=/opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0/efak-web-2.1.0 export PATH=$PATH:$KE_HOME/bin $ source /etc/profile 3)配置

这里设置hadoop-node1为master节点,其它两个节点为slave节点,修改参数

$ vi $KE_HOME/conf/system-config.properties # Multi zookeeper&kafka cluster list -- The client connection address of the Zookeeper cluster is set here efak.zk.cluster.alias=cluster1 cluster1.zk.list=hadoop-node1:12181,hadoop-node2:12181,hadoop-node3:12181 ###################################### # kafka jmx 地址,默认Apache发布的Kafka基本是这个默认值, # 对于一些公有云Kafka厂商,它们会修改这个值, # 比如会将jmxrmi修改为kafka或者是其它的值, # 若是选择的公有云厂商的Kafka,可以根据实际的值来设置该属性 ###################################### cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi # Zkcli limit -- Zookeeper cluster allows the number of clients to connect to # If you enable distributed mode, you can set value to 4 or 8 kafka.zk.limit.size=16 # EFAK webui port -- WebConsole port access address efak.webui.port=8048 ###################################### # EFAK enable distributed,启用分布式部署 ###################################### efak.distributed.enable=true # 设置节点类型slave or master # master worknode set status to master, other node set status to slave efak.cluster.mode.status=master # deploy efak server address efak.worknode.master.host=hadoop-node1 efak.worknode.port=8085 # Kafka offset storage -- Offset stored in a Kafka cluster, if stored in the zookeeper, you can not use this option cluster1.efak.offset.storage=kafka # Whether the Kafka performance monitoring diagram is enabled efak.metrics.charts=true # EFAK keeps data for 30 days by default efak.metrics.retain=15 # If offset is out of range occurs, enable this property -- Only suitable for kafka sql efak.sql.fix.error=false efak.sql.topic.records.max=5000 # Delete kafka topic token -- Set to delete the topic token, so that administrators can have the right to delete efak.topic.token=keadmin # 关闭自带的sqlite数据库,使用外部的mysql数据库 # Default use sqlite to store data # efak.driver=org.sqlite.JDBC # It is important to note that the '/hadoop/kafka-eagle/db' path must be exist. # efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db # efak.username=root # efak.password=smartloli # 配置外部数据库 # (Optional) set mysql address efak.driver=com.mysql.jdbc.Driver efak.url=jdbc:mysql://hadoop-node1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=123456 4)调整启动参数

EFAK默认启动内存大小为2G,考虑到服务器情况可以将其调小

## 在 efak 安装目录执行 $ vi $KE_HOME/bin/ke.sh ## 将 KE_JAVA_OPTS 最大最小容量调小,例如: export KE_JAVA_OPTS="-server -Xmx512m -Xms512m -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80" 6)修改 works配置

默认是localhost

$ cat >$KE_HOME/conf/works<<EOF hadoop-node2 hadoop-node3 EOF 7)将EFAK推送其它节点 # hadoop-node1推送 $ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node2:/opt/bigdata/hadoop/server/ $ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node3:/opt/bigdata/hadoop/server/ # 在hadoop-node2和hadoop-node3配置环境变量修改节点类型为slave 8)启动

上面配置文件配置的是分布式部署,当然也可以单机跑,但是不建议单机跑

# 在master节点上执行 $ cd $KE_HOME/bin $ chmod +x ke.sh # 单机版启动 $ ke.sh start # 集群方式启动 $ ke.sh cluster start $ ke.sh cluster restart

web UI:http://192.168.0.113:8048/ 账号密码:admin/123456

六、简单使用 1)Kafka CLI简单使用

【增】添加topic

# 创建topic,1副本,1分区,设置数据过期时间72小时(-1表示不过期),单位ms,72*3600*1000=259200000 $ kafka-topics.sh --create --topic test002 --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000

【查】

# 查看topic列表 $ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --list # 查看topic列表详情 $ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe # 指定topic $ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe --topic test002 # 查看消费者组 $ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --list $ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe --group test002

【改】这里主要是修改最常用的三个参数:分区、副本,过期时间

# 修改分区,扩分区,不能减少分区 $ kafka-topics.sh --alter --bootstrap-server hadoop-node1:9092 --topic test002 --partitions 2 # 修改过期时间,下面两行都可以 $ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --topic test002 --add-config retention.ms=86400000 $ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --entity-name test002 --entity-type topics --add-config retention.ms=86400000 # 修改副本数,将副本数修改成3 $ cat >1.json<<EOF {"version":1, "partitions":[ {"topic":"test002","partition":0,"replicas":[0,1,2]}, {"topic":"test002","partition":1,"replicas":[1,2,0]}, {"topic":"test002","partition":2,"replicas":[2,0,1]} ]} EOF $ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe --topic test002

【删】

$ kafka-topics.sh --delete --topic test002 --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092

【生成者】

$ kafka-console-producer.sh --broker-list hadoop-node1:9092 --topic test002 {"id":"1","name":"n1","age":"20"} {"id":"2","name":"n2","age":"21"} {"id":"3","name":"n3","age":"22"}

【消费者】

# 从头开始消费 $ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --from-beginning # 指定从分区的某个位置开始消费,这里只指定了一个分区,可以多写几行或者遍历对应的所有分区 $ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --partition 0 --offset 100

【消费组】

$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --group test002

【查看数据积压】

$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe --group test002

2)EFAK常用命令

$KE_HOME/bin/ke.sh启动脚本中包含以下命令:

命令描述ke.sh start启动 EFAK 服务器。ke.sh status查看 EFAK 运行状态。ke.sh stop停止 EFAK 服务器。ke.sh restart重新启动 EFAK 服务器。ke.sh stats查看 linux 操作系统中的 EFAK 句柄数。ke.sh find [ClassName]在 jar 中找到类名的位置。ke.sh gc查看 EFAK 进程 gc。ke.sh version查看 EFAK 版本。ke.sh jdk查看 EFAK 安装的 jdk 详细信息。ke.sh sdate查看 EFAK 启动日期。ke.sh cluster start查看 EFAK 集群分布式启动。ke.sh cluster status查看 EFAK 集群分布式状态。ke.sh cluster stop查看 EFAK 集群分布式停止。ke.sh cluster restart查看 EFAK 集群分布式重启。

EFAK环境部署,和kafka的一些简单操作就到这里了,后续会分享KSQL和其它更详细的页面化操作,请小伙伴耐心等待~


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #kafka图形化工具 #for #apache #Kafka以前称为 #Kafka