irpas技术客

IDEA本地运行Flink-java版_fir_dameng_idea 运行flink

网络 847

1、背景

flink作为当前最火实时大数据框架,也想阅读其源码,并实战一下它

2、具体步骤 2.1 环境准备

jdk1.8+ 即可 ,因为flink 大部分是Java编写的

2.2 创建idea项目

和普通的创建maven java项目一样,没有区别

2.3 pom.xml配置 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://·/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.11.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flinkstreaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.1</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flinkclients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.11.1</version> </dependency> <!-- 用于scala版本 --> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.11.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flinkstreaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.11.1</version> <scope>provided</scope> </dependency> </dependencies> </project> 2.4 批程序示例 import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * @Author CaiWencheng * @Date 2022-05-04 22:52 */ public class WordCountBatch { public static void main(String[] args) throws Exception { // 第一个参数为输入路径,第二个参数为输出路径 String inPath = "E:\\IdeaProjects\\FirstFlink\\data\\input\\hello.txt"; String outPath = "E:\\IdeaProjects\\FirstFlink\\data\\output\\output.txt"; // 获取Flink批处理执行环境 ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); // 获取文件中内容 DataSet<String> text = executionEnvironment.readTextFile(inPath); // 对数据进行处理 DataSet<Tuple2<String, Integer>> dataSet = text.flatMap(new LineSplitter()).groupBy(0).sum(1); dataSet.writeAsCsv(outPath,"\n"," ").setParallelism(1); // 触发执行程序 executionEnvironment.execute("wordcount batch process"); } static class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word:line.split(" ")) { collector.collect(new Tuple2<String, Integer>(word,1)); } } } }

hello.txt

hello flink hello zk hello spark

输出结果:output.txt

zk 1 flink 1 hello 3 spark 1 2.5 流程序示例 import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @Author CaiWencheng * @Date 2022-05-04 23:11 */ public class WordCountStream { public static void main(String[] args) throws Exception { // 获取Flink流执行环境 StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取socket输入数据 DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream("hadoop2", 7777, "\n"); SingleOutputStreamOperator<Tuple2<String, Long>> sum = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception { String[] splits = s.split("\\s"); for (String word : splits) { collector.collect(Tuple2.of(word, 1L)); } } }).keyBy(0).sum(1); // 打印数据 sum.print(); // 触发任务执行 streamExecutionEnvironment.execute("wordcount stream process"); } }

1)在hadoop2机器上,先执行

# 启动7777端口 nc -lp 7777

2)启动 WordCountStream 的main函数 3)在hadoop2 nc命令后,接着输入

hello flink hello flink hello hello hello spark stream flink hello

4)idea控制台输出

5> (hello,1) 13> (flink,1) 5> (hello,2) 5> (hello,3) 13> (flink,2) 5> (hello,4) 13> (flink,3) 5> (hello,5) 1> (spark,1) 5> (hello,6) 16> (stream,1)

注意事项:第1次执行流程序会报错,勾选如下配置即可

3、总结改进 idea 创建flink java 批/流处理程序,只需要添加对应的依赖即可,flink-java / flink-streaming-java_2.12 ,此外还需要额外加入flink-clients_2.12创建scala版程序和Java版本类似


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Idea #运行flink #环境准备jdk18 #即可 #因为flink #大部分是Java编写的22