irpas技术客

StreamX: Flink 开发脚手架, 流批一体大数据平台_明月清风,良宵美酒_flink脚手架

大大的周 966

StreamX: Flink 开发脚手架, 流批一体大数据平台 一、🚀 什么是 StreamX二、🎉 Features三、组成部分3.1 streamx-core3.2 streamx-pump3.3 streamx-console 四、如何安装4.1 环境4.2 安装4.2.1 初始化工程 SQL4.2.2 修改相关的数据库信息4.2.3 启动 streamx-console4.2.4 系统配置 五、如何使用5.1 部署 DataStream 任务5.2 部署 FlinkSql 任务 六、任务启动流程


一、🚀 什么是 StreamX

大数据技术如今发展的如火如荼,已经呈现百花齐放欣欣向荣的景象,实时处理流域 Apache Spark 和 Apache Flink 更是一个伟大的进步,尤其是Apache Flink被普遍认为是下一代大数据流计算引擎,我们在使用 Flink 时发现从编程模型, 启动配置到运维管理都有很多可以抽象共用的地方, 我们将一些好的经验固化下来并结合业内的最佳实践, 通过不断努力终于诞生了今天的框架 —— StreamX, 采用 java/scala 开发, 项目的初衷是 —— 让 Flink 开发更简单,使用StreamX开发,可以极大降低学习成本和开发门槛, 让开发者只用关心最核心的业务,StreamX 规范了项目的配置,鼓励函数式编程,提供了一系列开箱即用的Connectors,支持scala和java两套 api, 并且提供一个数据平台,基于Apache Flink 封装的一个可视化的,轻量级的 Flink Submit 系统,旨在简化 Flink 任务提交和管理运维,标准化了配置、开发、测试、部署、监控、运维的整个过程,其最终目的是打造一个一站式大数据平台,流批一体,湖仓一体的解决方案。

官网地址 http://·github 地址 https://github.com/streamxhub/streamx

二、🎉 Features 开发脚手架一系列开箱即用的 connectors项目编译功能(maven 编译)支持Applicaion 模式, Yarn-Per-Job模式启动快捷的日常操作(任务启动、停止、savepoint,从savepoint恢复)支持火焰图支持notebook(在线任务开发)项目配置和依赖版本化管理支持任务备份、回滚(配置回滚)在线管理依赖(maven pom)和自定义 jar自定义 udf、连接器等支持Flink sql SubmitFlink Sql WebIDE支持 catalog、hive从任务开发阶段到部署管理全链路支持… 三、组成部分

Streamx有三部分组成,分别是streamx-core,streamx-pump 和 streamx-console

3.1 streamx-core

streamx-core 定位是一个开发时框架,关注编码开发,规范了配置文件,按照约定优于配置的方式进行开发,提供了一个开发时 RunTime Content和一系列开箱即用的Connector,扩展了DataStream相关的方法,融合了DataStream和Flink sql api,简化繁琐的操作,聚焦业务本身,提高开发效率和开发体验

3.2 streamx-pump

pump 是抽水机,水泵的意思,streamx-pump的定位是一个数据抽取的组件,类似于flinkx,基于streamx-core中提供的各种connector开发,目的是打造一个方便快捷,开箱即用的大数据实时数据抽取和迁移组件,并且集成到streamx-console中,解决实时数据源获取问题,目前在规划中

3.3 streamx-console

streamx-console 是一个综合实时数据平台,低代码(Low Code)平台,可以较好的管理Flink任务,集成了项目编译、发布、参数配置、启动、savepoint,火焰图(flame graph),Flink SQL,监控等诸多功能于一体,大大简化了Flink任务的日常操作和维护,融合了诸多最佳实践。旧时王谢堂前燕,飞入寻常百姓家,让大公司有能力研发使用的项目,现在人人可以使用,其最终目标是打造成一个实时数仓,流批一体的一站式大数据解决方案

四、如何安装

streamx-console 提供了开箱即用的安装包,安装之前对环境有些要求,具体要求如下

4.1 环境 组件版本是否必须安装说明操作系统Linux是JAVA1.8+是Maven3+是部署机器必须安装 Maven,且配置好环境变量,项目编译会用到Hadoop2+是HDFS,YARN 等必须安装,并且配置好相关环境变量Flink1.12.0+是Flink 版本必须是 1.12.1 或以上版本,并且配置好 Flink 相关环境变量MySQL5.6+是部署机器或其他机器得安装 MySQL,系统会用到 MySQLPython2+否非必须,火焰图功能会用到 PythonPerl5.16.3+否非必须,火焰图功能会用到 Python
4.2 安装

在安装前一定要确保当前部署的机器满足上面环境相关的要求,当前安装的机器必须要有 Hadoop 环境,安装并配置好了Flink 1.12.0+,如果准备工作都已就绪,就可以安装了

git clone https://github.com/streamxhub/streamx.git cd Streamx mvn clean install -DskipTests -Denv=dev

编译成功后,在 streamx-console-service 模块下找到 streamx-console-service-1.0.0-bin-tar.gz, 解包后目录如下

. streamx-console-service-1.0.0 ├── bin │ ├── flame-graph │ ├── └── *.py //火焰图相关功能脚本(内部使用,用户无需关注) │ ├── startup.sh //启动脚本 │ ├── setclasspath.sh //java环境变量相关的脚本(内部使用,用户无需关注) │ ├── shutdown.sh //停止脚本 │ ├── yaml.sh //内部使用解析yaml参数的脚本(内部使用,用户无需关注) ├── conf │ ├── application.yaml //项目的配置文件(注意不要改动名称) │ ├── application-prod.yml //项目的配置文件(开发者部署需要改动的文件,注意不要改动名称) │ ├── flink-application.template //flink配置模板(内部使用,用户无需关注) │ ├── logback-spring.xml //logback │ └── streamx.sql //工程初始化脚本 ├── lib │ └── *.jar //项目的jar包 ├── plugins │ ├── streamx-jvm-profiler-1.0.0.jar //jvm-profiler,火焰图相关功能(内部使用,用户无需关注) │ └── streamx-flink-sqlcli-1.0.0.jar //Flink SQl提交相关功能(内部使用,用户无需关注) ├── logs //程序log目录 └── temp //内部使用到的零时路径,不要删除 4.2.1 初始化工程 SQL

streamx-console 要求的数据库是 MySQL,版本 5.6+以上,如准备就绪则进行下面的操作:

创建数据库:streamx执行初始化 sql (解包后的 conf/streamx.sql) 4.2.2 修改相关的数据库信息

工程 SQL 初始化完毕,则修改conf/application-prod.yml,找到 datasource 这一项,找到 mysql 的配置,修改成对应的信息即可,如下

datasource: dynamic: # 是否开启 SQL日志输出,生产环境建议关闭,有性能损耗 p6spy: true hikari: connection-timeout: 30000 max-lifetime: 1800000 max-pool-size: 15 min-idle: 5 connection-test-query: select 1 pool-name: HikariCP-DS-POOL # 配置默认数据源 primary: primary datasource: # 数据源-1,名称为 primary primary: username: $user password: $password driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://$host:$port/streamx?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8 4.2.3 启动 streamx-console

进入到bin下直接执行 start.sh 即可启动项目,默认端口是 10000,如果没啥意外则会启动成功

cd streamx-console-service-1.0.0/bin bash start.sh

相关的日志会输出到 streamx-console-service-1.0.0/logs/streamx.out 里 打开浏览器 输入 http://$deploy_host:10000/index.html 即可登录,登录界面如下 默认密码: admin / streamx

4.2.4 系统配置

进入系统之后,第一件要做的事情就是修改系统配置,在菜单/StreamX/Setting 下,修改StreamX Webapp address 和 StreamX Console Workspace 如下:

StreamX Webapp address 配置StreamX Console后台服务的访问地址StreamX Console Workspace 配置系统的工作空间,用于存放项目源码,编译后的项目等 五、如何使用

streamx-console 定位是流批一体的大数据平台,一站式解决方案,使用起来非常简单,没有复杂的概念和繁琐的操作,标准的 Flink 程序(安装 Flink 官方要去的结构和规范)和用streamx开发的项目都做了很好的支持,下面我们使用streamx-quickstart来快速开启 streamx-console 之旅 streamx-quickstart是 StreamX 开发 Flink 的上手示例程序,具体请查阅

Github: https://github.com/streamxhub/streamx-quickstart.gitGitee: https://gitee.com/benjobs/streamx-quickstart.git 5.1 部署 DataStream 任务

下面的示例演示了如何部署一个 DataStream 应用 http://assets.streamxhub.com/20210408008.mp4

5.2 部署 FlinkSql 任务

下面的示例演示了如何部署一个 FlinkSql 应用 http://assets.streamxhub.com/flinksql.mp4

项目演示使用到的 flink sql 如下 CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3) ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.properties.bootstrap.servers'='kafka-1:9092,kafka-2:9092,kafka-3:9092', 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'update-mode' = 'append', 'format.type' = 'json', -- 数据源格式为 json 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则 ); CREATE TABLE pvuv_sink ( dt VARCHAR, pv BIGINT, uv BIGINT ) WITH ( 'connector.type' = 'jdbc', -- 使用 jdbc connector 'connector.url' = 'jdbc:mysql://test-mysql:3306/test', -- jdbc url 'connector.table' = 'pvuv_sink', -- 表名 'connector.username' = 'root', -- 用户名 'connector.password' = '123456', -- 密码 'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条 ); INSERT INTO pvuv_sink SELECT DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM user_log GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00'); 使用到 maven 依赖如下 <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_2.12</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.12.0</version> </dependency> Kafka 模拟发送的数据如下 {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts":"2021-02-01T01:00:00Z"} {"user_id": "662867", "item_id":"2244074","category_id":"1575622","behavior": "pv", "ts":"2021-02-01T01:00:00Z"} {"user_id": "662867", "item_id":"2244074","category_id":"1575622","behavior": "pv", "ts":"2021-02-01T01:00:00Z"} {"user_id": "662867", "item_id":"2244074","category_id":"1575622","behavior": "learning flink", "ts":"2021-02-01T01:00:00Z"} 六、任务启动流程

关于项目的概念,Development Mode,savepoint,NoteBook,自定义 jar 管理,任务发布,任务恢复,参数配置,参数对比,多版本管理等等更多使用教程和文档请移步官网http://·

目前项目已正式开源,无数个日夜里,作者在源码中苦苦寻找答案,并以此为乐,历经无数汗水,现终于得见天日. 现在她如初生婴儿一般满怀无限憧憬的出现在世人面前,请多多关照,如果眼下还是一团零星之火,运筹帷幄之后,迎面东风,就是一场烈焰燎原吧


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #flink脚手架 #StreamX #Flink #开发脚手架 #流批一体大数据平台