irpas技术客

FlinkSQL源码解析(一)转换流程_Yuan_CSDF_flink sql 源码解析

大大的周 2864

1、前言 1.1、JavaCC

????????JavaCC(Java Compiler Compiler)是一个开源的语法分析器生成器和词法分析器生成器。JavaCC通过词法和语法描述文件来生成分析器。

????????flink通过java CC生成分析器用于sql解析和校验。

如下图:在flink-table下的flink-sql-parser项目中,org.apache.flink.sql.parser.impl下的类,就是使用javacc生成的。

1.2、Calcite

????????Apache Calcite是一个动态数据管理框架 ,它具备很多典型数据库管理系统的功能,如SQL解析、SQL校验、SQL查询优化等,又省略了一些功能,如不存储相关数据,也不完全包含相关处理数据等。

????????flink中的sql解析、sql校验和sql优化便是依赖calcite来完成的。

????????梳理一下Calcite SQL执行的几个阶段:

通过Parser解析器将传入的sql解析成一颗词法树,SqlNode作为树的节点做词法的校验Validate,类型校验,元数据校验等等将校验好的SqlNode树转换成对应的关系代数表达式,也是一颗树,RelNode作为节点将RelNode关系代数表达式树,通过内置的两种优化器Volcano , Hep 优化关系代数表达式得到最优逻辑代数的一颗树,也是RelNode最优的逻辑代数表达式(RelNode),会被转换成对应的可执行的物理执行计划(转换逻辑根据框架有所不同),像Flink就转成他的Operator去运行 2、Flink SQL转换流程

????????SQL语句经过Calcite解析生成抽象语法树SQLNode,基于生成的SQLNode并结合flink Catalog完成校验生成一颗Operation树,接下来blink planner将Operation树,接下来blink planner将Opearation树转为RelNode然后进行优化,最后生成Transformation变成流计算任务。

2.1、Sql语句解析成语法树阶段(SQL - > SqlNode)

????????TableEnvironmentImpl是sql执行的入口类,TableEnvironmentImpl中提供了executeSql,sqlQuery等方法用来执行DDL和DML等sql,sql执行时会对sql进行解析,ParserImpl是flink调用sql解析的实现类,ParserImpl#parse()方法中通过调用包装器对象CalciteParser#parse()方法并创建并调用使用javacc生成的sql解析器(FlinkSqlParserImpl)中的parseSqlStmtEof方法完成sql解析,并返回SqlNode对象

? ? ? ? 核心代码如下:

public List<Operation> parse(String statement) { CalciteParser parser = calciteParserSupplier.get(); FlinkPlannerImpl planner = validatorSupplier.get(); //TODO 在这里调用使用javacc生成的分析器,将sql语句解析成sqlNode SqlNode parsed = parser.parse(statement); //TODO 将sqlNode转换为Operation Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed) .orElseThrow(() -> new TableException("Unsupported query: " + statement)); return Collections.singletonList(operation); }

? ? ? ? 其中parser.parse(...)方法,将sql语句解析成sqlNode。对应的表名、列名、with属性参数、主键、唯一键、分区键、水印、表注释、表操作(create table、alter table、drop table。。。)都放到SqlNode对象的对应属性中,SqlNode是一个树形结构也就是AST。如下:

2.2、Sql校验(SqlNode - > Operation)

????????sql解析完成后执行sql校验,flink sql中增加了SqlNode转换为Operation的过程,sql校验是在这个过程中完成。在SqlToOperationConverter#convert()方法中完成这个过程的转换,之间会通过FlinkPlannerInpm#validate()方法对表、函数、字段等完成校验并基于生成的validated SqlNode生成对应的Opeation。

?????????不同的sql经过convert处理后返回不同的Operation,最后会根据不同的Operation有不同的处理行为。

? ? ? ? ?其中

2.3、Flink SQL优化(Operation - > RelNode->Transformation?)

????????Blink中并没有直接使用Calcite的优化器,而是通过规则组合和Calcite优化组合分别为batch和stream实现了自定义的优化器。 ????????优化执行前会先将SqlNode转为RelNode,基于RelNode调用PlannerBase#optimize()并执行StreamCommonSubGraphBasedOptimizer#doOptimize()方法完成优化

? ? ? ? 在完成Sql到RelNode的转换后,会执行executeOperation(...)操作,在这里先将sqlNode转换成RelNode。然后进行优化操作。传入的参数为:

? ? ? ? 然后根据传入的sql语句类型,选择不同的操作。包含有Modify、CreateTable、DropTable等。如下:

? ? ? ? 其实都是调用的TableEnvironmentImpl.executeInternal(...)。

? ? ? ? ?在这里,有进行转换和优化操作,重点是在translate方法中,最终调用的是PlannerBase里的translate(...)方法

override def translate( modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = { if (modifyOperations.isEmpty) { return List.empty[Transformation[_]] } // prepare the execEnv before translating getExecEnv.configure( getTableConfig.getConfiguration, Thread.currentThread().getContextClassLoader) overrideEnvParallelism() // TODO 在这里完成转换 SqlNode转换为RelNode val relNodes = modifyOperations.map(translateToRel) // TODO 在这里完成优化 val optimizedRelNodes = optimize(relNodes) val execNodes = translateToExecNodePlan(optimizedRelNodes) translateToPlan(execNodes) }

? ? ? ? 在上述的优化代码行,根据是流处理或者批处理老选择不同的类中的方法进行优化。

????????最终由translateToPlan方法将ExecNode转换成Transfomation列表

? ? ? ? 整体流程大致为:sqlNode --> Operation --> RelNode --> 优化 --> execNode --> Transformation

????????基于生成的Transformation对象调用StreamExecutor#createPipeline()方法生成StreamGraph便可以执行任务了。

????????至此flink sql转换流程便结束了。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #SQL #源码解析 #Compiler #flink通过java