irpas技术客

Flink入门系列--和Springboot框架结合(1.14.3版本)_打酱油的葫芦娃_flink springboot

大大的周 8238

本文 Flink 版本为 1.14.3。

本文主要讲解如何将 Flink 和 Springboot 两大框架融合在一起,

可以对接 SpringCloud 生态系统,比如分布式配置中心、服务注册和发现、负载均衡等;可以通过 Restful 接口的形式提交 Flink 任务。 本地工程构建

软件环境:

Flink 1.14.3Springboot 2.0.3.RELEASEJDK 11 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://·piler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <spring.boot.version>2.0.3.RELEASE</spring.boot.version> <flink.version>1.14.3</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>${spring.boot.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>${spring.boot.version}</version> </dependency> </dependencies> </project>

以 Socket WindowWordCount 程序为例:

// 添加 @Component 注解 @Component public class SocketWindowWordCount { public void wordCount(String hostname, int port, long windowSize) throws Exception { // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream(hostname, port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text.flatMap( (FlatMapFunction<String, WordWithCount>) (value, out) -> { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } }, Types.POJO(WordWithCount.class)) .keyBy(value -> value.word) .window(TumblingProcessingTimeWindows.of(Time.seconds(windowSize))) .reduce((a, b) -> new WordWithCount(a.word, a.count + b.count)) .returns(WordWithCount.class); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } } // 结果类 POJO /** Data type for words with count. */ public class WordWithCount { public String word; public long count; @SuppressWarnings("unused") public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public long getCount() { return count; } public void setCount(long count) { this.count = count; } @Override public String toString() { return word + " : " + count; } }

因为 SocketWindowWordCount 是1个流处理常驻任务,可以通过实现 Springboot 提供的 ApplicationRunner 接口,使得上述任务可以在 Springboot Application 启动时被构建并提交到 Flink 集群。

@Component public class Execution implements ApplicationRunner { @Value("${source.socket.host}") private String host; @Value("${source.socket.port}") private int port; @Value("${flink.window.size}") private long windowSize; @Autowired private SocketWindowWordCount wordCount; @Override public void run(ApplicationArguments args) throws Exception { // 启动任务 wordCount.wordCount(host, port, windowSize); } }

接下来,编写启动类:

@SpringBootApplication public class FlinkApplication { public static void main(String[] args) { SpringApplication.run(FlinkApplication.class, args); } }

对应的配置文件 application.yml 为:

server: port: 7605 connection-timeout: 300s servlet: context-path: /awesome/flink/springboot spring: application: name: flink-springboot source: socket: host: localhost port: 9000 flink: window: size: 10

接着在 Windows 上安装 netcat:

下载地址;解压后,将 nc.exe 复制到 C:\Windows\System32 目录下;

打开 cmd,执行下述命令用于发送消息:

nc -l -p 9000

紧接着执行 FlinkApplication 的 main 方法启动 Springboot 工程:

一定要执行完 nc -l -p 9000 命令之后再启动 Springboot 工程,因为 Springboot 工程在启动的时候会先连接端口号为 9000 的 Socket

通过启动日志,可以看到 Tomcat 启动后,紧接着会启动 Flink 内嵌的 Mini Cluster。同时, Mini Cluster 也有相应的前端 Web 页面,可以在启动日志里获取其 Web URL。

在浏览器里打开上述页面:

可以发现,SocketWindowWordCount 任务已成功提交到 Flink 集群。

紧接着,在上面的 nc 命令下连续输入:

Hello Hello World

可以看到控制台输出:

Hello : 2 World : 1 提交到远程集群

接下来,尝试将应用提交到远程的 Flink 集群。

如果你不熟悉 Flink 集群的搭建,可以参照我之前写的1篇文章:

Flink入门系列–安装部署及任务提交(1.14.3版本)

这里,我们搭建1个单节点的 Flink Session 集群: 本地工程提交到远程集群,需要将 Springboot 相关依赖放置到 Flink 集群的 classpath 下,或者通过 maven 的 shade 插件将工程代码和相关依赖打成1个 fat jar。本文选用第2种方法,在 pom.xml 里增加 build 打包配置:

<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.0.3.RELEASE</version> </dependency> </dependencies> <configuration> <finalName>socket-word-count-jar-with-dependencies</finalName> <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope> <createDependencyReducedPom>true</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> <resource>META-INF/spring.factories</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.tooling</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass> org.test.awesome.flink.springboot.FlinkApplication </mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>

执行 install,将 fat jar 提交到 Flink 集群所在的服务器上。

启动前,在服务器上打开 nc,Linux 系统的 nc 命令和 Windows 略有不同。

nc -lk 9000

然后执行下述命令提交任务:

# /usr/lib/test/flink-1.14.3/ 为 Flink 的安装目录 # /usr/lib/test/apps/socket-word-count-jar-with-dependencies.jar 为上传的 fat jar /usr/lib/test/flink-1.14.3/bin/flink run /usr/lib/test/apps/socket-word-count-jar-with-dependencies.jar

结果报如下错误:

The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/usr/lib/ruanshubin/flink-1.14.3/lib/log4j-slf4j-impl-2.17.1.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.apache.logging.slf4j.Log4jLoggerFactory at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/usr/lib/ruanshubin/flink-1.14.3/lib/log4j-slf4j-impl-2.17.1.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.apache.logging.slf4j.Log4jLoggerFactory at org.springframework.util.Assert.instanceCheckFailed(Assert.java:637) at org.springframework.util.Assert.isInstanceOf(Assert.java:537) at org.springframework.boot.logging.logback.LogbackLoggingSystem.getLoggerContext(LogbackLoggingSystem.java:274) at org.springframework.boot.logging.logback.LogbackLoggingSystem.beforeInitialize(LogbackLoggingSystem.java:99) at org.springframework.boot.context.logging.LoggingApplicationListener.onApplicationStartingEvent(LoggingApplicationListener.java:191) at org.springframework.boot.context.logging.LoggingApplicationListener.onApplicationEvent(LoggingApplicationListener.java:170) at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:127) at org.springframework.boot.context.event.EventPublishingRunListener.starting(EventPublishingRunListener.java:68) at org.springframework.boot.SpringApplicationRunListeners.starting(SpringApplicationRunListeners.java:48) at org.springframework.boot.SpringApplication.run(SpringApplication.java:313) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1255) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1243) at com.ruanshubin.awesome.flink.springboot.FlinkApplication.main(FlinkApplication.java:14) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 8 more

显然是 Springboot 自带的 Logback 和 Flink 的默认日志实现冲突了,在 pom.xml 解一下依赖冲突重新上传:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>${spring.boot.version}</version> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency>

此时,任务成功提交。

在 nc 下同样输入下述词语:

Hello Hello World

可以看到,Flink 任务也接收并处理了3个词语。

本文到此结束,感谢阅读!


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #springboot #本文 #版本为 #1143 #本文主要讲解如何将