irpas技术客

springboot集成flink并发布flink集群端运行_guaotianxia_flink springboot

网络 2028

背景:近期项目需要,引入flink,研究了下flink,步步踩坑终于可以单独运行,也可发布到集群运行,记录下踩坑点。开发环境:idea+springboot(2.3.5.RELEASSE)+kafka(2.8.1)+mysql(8.0.26)。废话不多说,直接上可执行代码。

以下代码实现了某个时间间隔,设备不上传数据,判断为离线的逻辑

一、项目application创建

/** * flink任务提交application * * @author wangfenglei */ @SpringBootApplication(scanBasePackages = {"com.wfl.firefighting.flink","com.wfl.firefighting.util"}) public class DataAnalysisFlinkApplication { public static void main(String[] args) { SpringApplication.run(DataAnalysisFlinkApplication.class, args); } }

二、设备状态计算主体,从kafka接收数据,然后通过KeyedProcessFunction函数进行计算,然后把离线设备输出到mysql sink,更新设备状态

/** * 从kafka读取数据,计算设备状态为离线后写入mysql * * @author wangfenglei */ @Component @ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false) public class DeviceDataKafkaSource { private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class); @Value("${spring.kafka.bootstrap-servers:localhost:9092}") private String kafkaServer; @Value("${spring.kafka.properties.sasl.jaas.config}") private String loginConfig; @Value("${customer.flink.cal-device-status-topic}") private String topic; @Autowired private ApplicationContext applicationContext; /** * 执行方法 * * @throws Exception 异常 */ @PostConstruct public void execute() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.setParallelism(1); Properties properties = new Properties(); //kafka的节点的IP或者hostName,多个使用逗号分隔 properties.setProperty("bootstrap.servers", kafkaServer); //kafka的消费者的group.id properties.setProperty("group.id", "data-nanlysis-flink-devicestatus"); //设置kafka安全认证机制为PLAIN properties.setProperty("sasl.mechanism", "PLAIN"); //设置kafka安全认证协议为SASL_PLAINTEXT properties.setProperty("security.protocol", "SASL_PLAINTEXT"); //设置kafka登录验证用户名和密码 properties.setProperty("sasl.jaas.config", loginConfig); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); DataStream<String> stream = env.addSource(myConsumer); stream.print().setParallelism(1); DataStream<String> deviceStatus = stream //进行转换只获取设备序列码 .map(data -> CommonConstant.GSON.fromJson(data, MsgData.class).getDevSn()) //按照设备序列码分组 .keyBy(new KeySelector<String, String>() { @Override public String getKey(String value) throws Exception { return value; } }) //进行计算,判断周期内是否有新数据上传,没有则输出认为设备离线 .process((CalDeviceOfflineFunction) applicationContext.getBean("calDeviceOfflineFunction")); //写入数据库 deviceStatus.addSink((SinkFunction) applicationContext.getBean("deviceStatusSink")); //启动任务 new Thread(() -> { try { env.execute("deviceStatusFlinkJob"); } catch (Exception e) { log.error(e.toString(), e); } }).start(); } }

说明:

1、通过@ConditionalOnProperty开关形式控制程序是否执行,后续此模块可以开发多个flink执行任务,通过开关的形式提交需要的job

2、通过springboot的@PostConstruct注解,让项目application启动时,自动执行job

3、用Thread线程执行任务提交,否则application启动时会一直flink执行中

4、日志打印,需要使用slf4j,跟flink自己依赖jar包打印日志保持一致,如此在flink集群执行时可以打印日志

import org.slf4j.Logger; import org.slf4j.LoggerFactory; private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);

5、kafka连接开启了登录验证,配置见application.yml。kafka登录验证server端配置见官网资料,后续有时间写个文章记录下

三、设备离线计算

/** * KeyedProcessFunction 为每个设备序列码维护一个state,并且会把间隔时间内(事件时间)内没有更新的设备序列码输出: * 对于每条记录,CalDeviceOfflineFunction 修改最后的时间戳。 * 该函数还会在间隔时间内调用回调(事件时间)。 * 每次调用回调时,都会检查存储的最后修改时间与回调的事件时间时间戳,如果匹配则发送设备序列码(即在间隔时间内没有更新,表示没有设备数据上传) * * @author wangfenglei */ @Component @ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false) public class CalDeviceOfflineFunction extends KeyedProcessFunction<String, String, String> { private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class); /** * 这个状态是通过 ProcessFunction维护 */ private ValueState<DeviceLastDataTimestamp> deviceState; /** * 定时任务执行时间 */ private ValueState<Long> timerState; @Autowired private DeviceService deviceService; @Override public void open(Configuration parameters) throws Exception { deviceState = getRuntimeContext().getState(new ValueStateDescriptor<>("deviceState", DeviceLastDataTimestamp.class)); timerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timerState", Long.class)); } /** * 每条数据执行过程 * * @param value 输入数据 * @param ctx 环境 * @param out 输出数据 * @throws Exception 异常 */ @Override public void processElement(String value, Context ctx, Collector<String> out) throws Exception { log.info("++++++++++++++fink recevice deviceSn={}", value); // 查看当前计数 DeviceLastDataTimestamp current = deviceState.value(); if (current == null) { current = new DeviceLastDataTimestamp(); current.key = value; current.lastDataTime = ctx.timestamp(); } Long currentTimerState = timerState.value(); if (null == currentTimerState) { //初始值设置为-1 timerState.update(-1L); } if (-1 != timerState.value()) { //删除原先定时任务,然后重新注册新的定时任务 ctx.timerService().deleteProcessingTimeTimer(timerState.value()); } long interval = deviceService.getDeviceOfflineInterval(value); // 设置状态的时间戳为记录的事件时间时间戳 current.lastDataTime = ctx.timestamp(); //设置判断离线时间间隔 current.interval = interval; // 状态回写 deviceState.update(current); //更新定时任务执行时间 timerState.update(current.lastDataTime + interval); //注册新的定时任务 ctx.timerService().registerProcessingTimeTimer(current.lastDataTime + interval); } /** * 定时器触发后执行的方法 * * @param timestamp 这个时间戳代表的是该定时器的触发时间 * @param ctx 定时器环境类 * @param out 输出 * @throws Exception 异常 */ @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { // 取得该设备状态的State状态 DeviceLastDataTimestamp result = deviceState.value(); // timestamp是定时器触发时间,如果等于最后一次更新时间+离线间隔时间,就表示这十秒内没有收到过该设备报文了 if (timestamp == result.lastDataTime + result.interval) { // 发送 out.collect(result.key); // 打印数据,用于核对是否符合预期 log.info("==================" + result.key + " is offline"); } } /** * 设备最后上传数据时间戳数据类 */ class DeviceLastDataTimestamp { public String key; public long lastDataTime; public long interval; } }

四、 更新设备离线状态

/** * 向mysql写入数据 * * @author wangfenglei */ @Component @ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false) public class DeviceStatusSink extends RichSinkFunction<String> { private static final Logger log = LoggerFactory.getLogger(DeviceStatusSink.class); @Value("${spring.datasource.dynamic.datasource.master.url}") private String datasoureUrl; @Value("${spring.datasource.dynamic.datasource.master.username}") private String userName; @Value("${spring.datasource.dynamic.datasource.master.password}") private String password; @Value("${spring.datasource.dynamic.datasource.master.driver-class-name}") private String driverClass; private Connection conn = null; private PreparedStatement ps = null; @Override public void open(Configuration parameters) throws Exception { //加载驱动,开启连接 try { Class.forName(driverClass); conn = DriverManager.getConnection(datasoureUrl, userName, password); } catch (Exception e) { e.printStackTrace(); } } @Override public void invoke(String deviceSn, Context context) { try { String sql = "update biz_device t set t.status=2 where t.dev_sn=?"; ps = conn.prepareStatement(sql); ps.setString(1, deviceSn); ps.executeUpdate(); log.info("update biz_device t set t.status=2 where t.dev_sn={}", deviceSn); } catch (Exception e) { e.printStackTrace(); } } /** * 结束任务,关闭连接 * * @throws Exception */ @Override public void close() throws Exception { if (conn != null) { conn.close(); } if (ps != null) { ps.close(); } } }

五、application.yml配置

server: port: 8099 spring: autoconfigure: exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure datasource: druid: stat-view-servlet: enabled: true loginUsername: admin loginPassword: 123456 allow: web-stat-filter: enabled: true dynamic: druid: # 全局druid参数,绝大部分值和默认保持一致。(现已支持的参数如下,不清楚含义不要乱设置) # 连接池的配置信息 # 初始化大小,最小,最大 initial-size: 5 min-idle: 5 maxActive: 20 # 配置获取连接等待超时的时间 maxWait: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 timeBetweenEvictionRunsMillis: 60000 # 配置一个连接在池中最小生存的时间,单位是毫秒 minEvictableIdleTimeMillis: 300000 validationQuery: SELECT 1 FROM DUAL testWhileIdle: true testOnBorrow: false testOnReturn: false # 打开PSCache,并且指定每个连接上PSCache的大小 poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 20 # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 filters: stat,wall,slf4j # 通过connectProperties属性来打开mergeSql功能;慢SQL记录 connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000 datasource: master: url: jdbc:mysql://127.0.0.1:3306/fire?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver kafka: bootstrap-servers: 127.0.0.1:9092 # 指定kafka 代理地址,可以多个 producer: # 生产者 retries: 1 # 设置大于0的值,则客户端会将发送失败的记录重新发送 # 每次批量发送消息的数量 batch-size: 16384 buffer-memory: 33554432 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer #修改最大向kafka推送消息大小 properties: max.request.size: 52428800 consumer: group-id: data-analysis-flink #手动提交offset保证数据一定被消费 enable-auto-commit: false #指定从最近地方开始消费(earliest) auto-offset-reset: latest #消费者组 #group-id: dev properties: #服务端没有收到心跳超时时间,设置长点以防调试时超时 session: timeout: ms: 60000 heartbeat: interval: ms: 30000 security: protocol: SASL_PLAINTEXT sasl: mechanism: PLAIN jaas: config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="root" password="root";' #自定义配置 customer: #flink相关配置 flink: #是否开启设置状态计算 cal-device-status: true cal-device-status-topic: device-upload-data

六、pom.xml文件

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.wfl.firefighting</groupId> <artifactId>data-analysis</artifactId> <version>1.0.0</version> </parent> <groupId>com.wfl.firefighting</groupId> <artifactId>data-analysis-flink</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.wfl.firefighting</groupId> <artifactId>data-analysis-service</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>com.wfl.firefighting</groupId> <artifactId>data-analysis-model</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>io.github.openfeign</groupId> <artifactId>feign-httpclient</artifactId> <version>10.10.1</version> </dependency> <!-- druid --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.22</version> </dependency> <!-- 动态数据源 --> <dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId> <version>2.5.4</version> </dependency> <!--mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.20</version> </dependency> <!-- flink依赖引入 开始--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.13.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.13.1</version> </dependency> <!-- flink连接kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.13.1</version> </dependency> <!-- flink连接es--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.13.1</version> </dependency> <!-- flink连接mysql--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.10.0</version> </dependency> <!-- flink依赖引入 结束--> </dependencies> <build> <finalName>data-analysis-flink</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>module-info.class</exclude> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> <resource>reference.conf</resource> </transformer> <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> <resource>META-INF/spring.factories</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.wfl.firefighting.flink.DataAnalysisFlinkApplication</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>

说明:?

1、如果使用local执行方式,不需要提交到flink服务端执行job,可以使用spring-boot-maven-plugin,直接java -jar执行即可,如下:

<build> <finalName>data-analysis-flink</finalName> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <!-- 指定启动入口 --> <configuration> <mainClass>com.wfl.firefighting.flink.DataAnalysisFlinkApplication</mainClass> </configuration> <executions> <execution> <goals> <!--可以把依赖的包都打包到生成的Jar包中--> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

使用spring-boot-maven-plugin打的jar包,提交到flink集群端执行,会报错,提示找不到类,因为springboot默认打包BOOT-INF目录,flink服务端执行会提示找不到类。使用maven-shade-plugin打包,既可以用java -jar执行,也可以提交到flink服务端执行。

2、maven-shade-plugin打的jar包,如果提交到服务端执行,需要去掉springboot默认集成的logback,否则服务端执行报错,提示Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath,如下:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency>

如果本地执行java -jar形式,需要在build的中注释掉以下内容,否则启动报错提示:java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory

<!--<exclude>org.slf4j:*</exclude>--> <!--<exclude>log4j:*</exclude>-->

3、使用maven-shade-plugin打包,必须添加如下,否则提示Cannot find 'resource' in class org.apache.maven.plugins.shade.resource.ManifestResourceTransformer

<transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> <resource>reference.conf</resource> </transformer> <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> <resource>META-INF/spring.factories</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.wfl.firefighting.flink.DataAnalysisFlinkApplication</mainClass> </transformer> </transformers>

七、执行效果:

1、本地执行

2、提交到flink集群执行?

八、其他踩坑点

1、报错提示:The RemoteEnvironment cannot be instantiated when running in a pre-defined context

解决方法:将StreamExecutionEnvironment修改为getExecutionEnvironment,获取当前执行环境

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2、报错提示:??Insufficient number of network buffers: required 65, but only 38 available. The total number of network buffers is currently set to 2048 of 32768 bytes each.

解决办法:env.setParallelism(1)

env.setParallelism(1);

3、报错提示:?Caused by: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'

解决办法:在 flink 配置文件里 flink-conf.yaml设置classloader.check-leaked-classloader: false


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #springboot #废话不多说直接上可执行代码