irpas技术客

记一次自定义实现Flink-sql-connector-xxx过程_阳信鸭梨_flink-sql-connector

网络投稿 3513

需求:实现FlinkSQL sink到ArangoDB图数据库

分析:自定义Flink Table & SQL connector 支持flink-connector-arangodb,只需要实现sink部分

官网支持user-defined sources&sinks,对Table SQL的source/sink定义提供了解释

Metadata:对表的声明,封装为Catalog,定义外部存储系统的元数据

Planning:Factory实例由Java SPI机制创建,将外部表元数据配置封装为参数化实例(DynamicTableSource/Sink)

Runtime:读取/写入核心逻辑,实现InputFormat/OutputFormat或SourceFunction/SinkFunction接口,构建与外部存储系统的连接和实现读取和写入逻辑。

需要我们扩展的地方:

Dynamic Table Factory

自定义工场类实现org.apache.flink.table.factories.DynamicTableSinkFactory(我这里仅要支持sink,如果要支持source,需要实现org.apache.flink.table.factories.DynamicTableSourceFactory)

DDL语句中的‘connector’配置项作为标识符用来发现对应的工厂类实例

Factory工场类是由Java SPI来实例化的,我们需要在自定义connector模块的resource下添加文件

META-INF/services/org.apache.flink.table.factories.Factory

文件中指定工厂类的全路径

Dynamic Table Sink

?Factory工厂类主要构建DynamicTableSource/DynamicTableSink,这是个参数化实例对象,定义connector配置参数。在DynamicTableSource/Sink中数据的传递要使用Flink内部数据结构org.apache.flink.table.data.RowData,这里获取到数据需要对value做一下转换,value数据提取封装为RowData,RowData接口的实现类也比较多,可根据情况选择合适的实现类。

Sink接口实现:

有三个接口的实现会影响DML语句的执行

接口描述SupportsOverwrite实现此接口可以使用INSERT OVERWRITE语句覆盖现有的表或分区数据SupportsPartitioning允许写入分区数据SupportsWritingMetadata保存持久化DDL中定义的列和类型

Runtime Provider

这里官网并没有对实际读取写入的Runtime实现作详细解释。说一下个人的理解

Dynamic Table Source/Sink 提供了获取RuntimeProvider实例函数getSinkRuntimeProvider(Context context),这个函数需要我们自定义逻辑去声明InputFormat/OutputFormat或者source/sinkFunction实例化对象

两种方式运行Provider

1.OutputFormatProvider.of(InputFormat/OutputFormat) 2.SinkFunctionProvider.of(source/sinkFunction)

使用lambda表达式,执行return () -> xxxFormat/xxxFunction;构建静态provider

关于InputFormat/OutputFormat或者source/sinkFunction

关键方法

1.xxxxFunction<T> // 建立连接 open(); // 执行读取/写入逻辑 invoke(T value,Context context); // 关闭连接 close(); 2.xxxxFormat<In> // 建立连接 open(int taskNumber, int numTasks); // 执行读取/写入逻辑 writeRecord(In record) // 关闭连接 close();

Encoding / Decoding Formats(待完善)

阅读flink-connector模块,对比几个connector的源码,分析后得出简单的connector主体架构:

// 1.工厂类构造source和sink,java SPI创建实例 ArangoDBDynamicTableFactory imp DynamicTableSinkFactory (如果支持source,需imp DynamicTableSourceFactory) ?? ?- createDynamicTableSink // 创建连接器的参数化实例 -> 封装connector的参数 ?? ?- optionalOptions ?? ?- requiredOptions ? ??- factoryIdentifier // 2.arangodbsink引出sinkfunction ArangodbDynamicTableSink // 3.sinkFunction/outputFormat建立连接执行写入 ????????- open() ? // 建立连接 ?? ?????- invoke() // arangodb API ?? ??? ?????????- writeRecord ?? ??? ?????????- ArangoCollection.insertDocuments(values) ?? ??? ?????????- ArangoCollection.updateDocument(key,value) ?? ????- close() ?// 关闭连接 // 4.掺杂着其他的辅助类 ?? ?- convert? ?? ??? ?rowData -> document ?? ??? ?serialize/deserialize


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #需求实现FlinkSQL #TABLE #ampamp #SQL #Connector