irpas技术客

flink读取kafka数据_逆风飞翔的小叔_flink读取kafka

大大的周 7014

前言

在实际生产环境中,经常需要使用flink读取外部的数据源作为数据的输入流,其中kafka就是重要的实时数据源,flink可以通过消费kafka指定的topic数据达到实时处理的目的,下面演示如何使用flink读取kafka的数据

环境准备

1、安装并启动zk服务

这个相信基本上都会了,就不再演示了

2、安装并启动kafka

本文为演示方便,直接使用docker快速启动一个kafka的容器,可以执行如下命令

docker run -d --name my_kafka \ -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=ZK公网IP:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://ZK公网IP:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

注意执行上面的命令之前,确保zk已经启动

docker命令执行完毕后,检查kafka容器是否创建成功

?3、创建一个topic

进入上面的kafka的docker容器,创建一个topic,以供后面使用,执行下面命令进入容器:

docker exec -it my_kafka /bin/bash

进入bin目录,

cd /opt/kafka_2.13-2.8.1/bin

在该目录下创建一个topic,执行下面的命令进行topic的创建成功后,

./kafka-topics.sh --zookeeper ZK公网IP:2181 \ --create --topic zcy \ --partitions 2 --replication-factor 1

可通过下面命令查看已存在的topic列表

./kafka-topics.sh --zookeeper ZK公网IP:2181 --list

?

以上的准备工作完成后,下面开始编码实现

编码实现

1、导入flink-kafka的依赖

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>1.10.1</version> </dependency>

2、核心代码

import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import java.util.Properties; /** * kafka作为数据源 */ public class SoureTest3 { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //读取kafka的数据 Properties properties = new Properties(); properties.setProperty("bootstrap.servers","kafka公网IP:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); DataStreamSource<String> dataStreamSource = env.addSource( new FlinkKafkaConsumer011<>( "zcy", new SimpleStringSchema(), properties) ); dataStreamSource.print(); env.execute(); } }

然后使用下面的命令再在kafka的终端,开启生产者的shell窗口,

./kafka-console-producer.sh --broker-list 公网IP:9092 --topic zcy

效果如下:

?启动上面的程序,观察控制台,这时等待接收外部topic的数据

?然后从kafka的终端发送一条消息,可以看到,数据就能成功输出到控制台了,几乎是近实时的

?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #flink读取kafka #flink读取kafka数据