irpas技术客

最新 Flink 1.13 的 Flink SQL 快速入门,详细教程_数据文_flink sql 教程

irpas 5226

Table API 和 SQL

文章目录 Table API 和 SQL一、简单使用二、动态表和持续查询1. 动态表(Dynamic Tables)2. 持续查询(Continuous Query)3. 表的查询分类4. 表转流或写出的编码方式 三、时间属性和窗口1. 事件时间2. 处理时间3. 窗口(Window) 四、聚合(Aggregation)查询1. 分组聚合2. 窗口聚合3. 开窗(Over)聚合4. Top N 五、联结(Join)查询1. 常规联结查询2. 间隔联结查询3. 时间联结 六、函数1. 系统函数2. 自定义函数(UDF) 七、SQL 客户端八、连接到外部系统1. Kafka2. 文件系统3. JDBC4. Elasticsearch5. HBase6. Hive

一、简单使用 // 创建表环境 //表环境(TableEnvironment)主要负责: // 1 注册Catalog 和表; // 2 执行SQL查询; // 3 注册用户自定义函数(UDF); // 4 DataStream 和表之间的转换。 TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance() .inStreamingMode().useBlinkPlanner().build()) StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env) tableEnv.useCatalog("custom_catalog"); tableEnv.useDatabase("custom_database"); 1. 连接器表(Connector Tables) // 创建输入表,连接外部系统读取数据 tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ( 'connector' = ... )"); // 注册一个表,连接到外部系统,用于输出 tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )"); 2. 虚拟表(Virtual Tables) //注册表,基于表名的sql查询 tableEnv.createTemporaryView("NewTable", newTable); Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable... "); //基于Table对象的sql查询 Table table0 = tableEnv.sqlQuery("select url, user from " + eventTable); //使用Table API对表进行查询 Table table2 =tableEnv.from("inputTable").where($("num").isEqual($("1"))) .select(...); // 将得到的结果写入输出表 TableResult tableResult = table1.executeInsert("outputTable"); //流表转化 tableEnv.toAppendStream(table, Row.class); tableEnv.toDataStream(table); tableEnv.toChangelogStream(table); //更新日志流+I-U+U tableEnv.fromDataStream(eventStream, $("timestamp"), $("url")); tableEnv.createTemporaryView("EventTable", eventStream, $("timestamp").as("ts"),$("url")); tableEnv.fromChangelogStream() //需要row类型的流 //支持的数据类型 //(1)原子类型在 Flink 中,基础数据类型(Integer、Double、String) 和通用数据类型(也就是不可再拆分的数据类型)统一称作“原子类型”。 // 将数据流转换成动态表,动态表只有一个字段,重命名为myLong Table table = tableEnv.fromDataStream(stream, $("myLong")); //(2)Tuple 类型 // 将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换 Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0")); // 将f1字段命名为myInt,f0命名为myLong Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt")); //(3)POJO 类型 Table table = tableEnv.fromDataStream(stream); Table table = tableEnv.fromDataStream(stream, $("user"),$("url")); Table table = tableEnv.fromDataStream(stream, $("user").as("myUser")); //(4)Row 类型 DataStream<Row> dataStream = env.fromElements( Row.ofKind(RowKind.INSERT, "Alice", 12), Row.ofKind(RowKind.INSERT, "Bob", 5), Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12), Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100)); // 将更新日志流转换为表 Table table = tableEnv.fromChangelogStream(dataStream); 二、动态表和持续查询 1. 动态表(Dynamic Tables)

当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的 SQL 查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”。

2. 持续查询(Continuous Query)

由于数据在不断变化,因此基于它定义的 SQL 查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”。具体步骤:

流(stream)被转换为动态表(dynamic table);对动态表进行持续查询(continuous query),生成新的动态表;生成的动态表被转换成流。 3. 表的查询分类 更新(Update)查询:用来定义结果表的更新日志(changelog)流中,包含了 INSERT 和 UPDATE 两种操作。这种持续查询被称为更新查询(Update Query),更新查询得到的结果表如果想要转换成 DataStream,必须调用 toChangelogStream()方法。?追加(Append)查询:只有插入(Insert)操作。

查询限制:由于需要维护的状态持续增长,也可能是由于更新数据的计算太复杂,不太适合作为连续查询在流处理中执行。

4. 表转流或写出的编码方式

?动态表也可以通过插入、更新和删除操作,进行持续的更改。将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。

?在 Flink 中,Table API 和 SQL 支持三种编码方式:?

仅追加(Append-only)流 :仅通过插入(Insert)撤回(Retract)流:添加(add)消息和撤回(retract)消息。INSERT插入操作编码为add 消息;DELETE 删除操作编码为 retract 消息;而UPDATE 更新操作则编码为被更改行的 retract 消息,和更新后行(新行)的 add 消息。这样,我们可以通过编码后的消息指明所有的增删改操作,一个动态表就可以转换为撤回流了。更新插入(Upsert)流:更新插入(upsert)消息和删除(delete)消息。动态表中必须有唯一的键(key)。通过这个 key 进行查询,如果存在对应的数据就做更新,如果不存在就直接插入(insert)。这是一个动态表可以转换为更新插入流的必要条件。当然,收到这条流中数据的外部系统,也需要知道这唯一的键(key),这样才能正确地处理消息。 更新(update)操作由于有 key 的存在,只需要用单条消息编码就可以,因此效率更高。

将动态表转换为DataStream 时,只支持仅追加(append-only)和撤回(retract)流,我们调用 toChangelogStream()得到的其实就是撤回流;这也很好理解, DataStream 中并没有 key 的定义,所以只能通过两条消息一减一增来表示更新操作。而连接到外部系统时,则可以支持不同的编码方法,这取决于外部系统本身的特性。

三、时间属性和窗口

按照时间语义的不同,我们可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)两种情况。

1. 事件时间

事件时间属性可以在创建表 DDL 中定义,也可以在数据流和表的转换中定义。

在创建表的 DDL 中定义 : 在创建表的 DDL(CREATE TABLE 语句)中,可以增加一个字段,通过 WATERMARK 语句来定义事件时间属性。WATERMARK 语句带有事件时间戳的字段标记为事件时间属性,并在它基础上给出水位线的延迟时间。

Flink中支持的事件时间属性数据类型必须为 TIMESTAMP 或者 TIMESTAMP_LTZ [LOCAL TIME ZONE]。

一般情况下如果数据中的时间戳是“年-月-日-时-分-秒”的形式,可以将事件时间属性定义为 TIMESTAMP 类型。 而如果原始的时间戳就是一个长整型的毫秒数,这时就需要另外定义一个字段来表示事件时间属性,类型定义为 TIMESTAMP_LTZ 会更方便。

-- 时间戳是“年-月-日-时-分-秒”的形式 CREATE TABLE EventTable( user STRING, url STRING, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( ... ); --时间戳就是一个长整型的毫秒数 CREATE TABLE events ( user STRING, url STRING, ts BIGINT, ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND ) WITH ( ... );

?在数据流转换为表时定义:我们调用 fromDataStream() 方法创建表时,可以给某个字段加上.rowtime() 后缀,就表示将当前字段指定为事件时间属性。

这种方式只负责指定时间属性,而时间戳的提取和水位线的生成应该之前就在 DataStream 上定义好了。

// 方法一: // 需要自定义提取时间戳并生成水位线 DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...); // 声明一个额外的逻辑字段作为事件时间属性 Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").rowtime()); // 方法二: // 流中数据类型为三元组Tuple3,最后一个字段就是事件时间戳 DataStream<Tuple3<String, String, Long>> stream = inputStream.assignTimestampsAndWatermarks(...); // 不再声明额外字段,直接用最后一个字段作为事件时间属性 Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").rowtime()); 2. 处理时间

处理时间属性的定义也有两种方式:创建表 DDL 中定义,或者在数据流转换成表时定义。

在创建表的 DDL 中定义: 通过调用系统内置的 PROCTIME()函数来指定当前的处理时间属性,返回的类型是 TIMESTAMP_LTZ。

计算列:Flink SQL 中引入的特殊概念,可以用一个 AS 语句来在表中产生数据中不存在的列,并且可以利用原有的列、各种运算符及内置函数。

CREATE TABLE EventTable( user STRING, url STRING, ts AS PROCTIME() ) WITH ( ... );

在数据流转换为表时定义: 我们调用 fromDataStream()方法创建表时,可以用.proctime()后缀来指定处理时间属性字段。由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。

DataStream<Tuple2<String, String>> stream = ...; // 声明一个额外的字段作为处理时间属性字段 Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").proctime()); 3. 窗口(Window)

分组窗口(Group Window,老版本) : 在 Flink 1.12 之前的版本中, Table API 和 SQL 提供了一组“分组窗口”(Group Window)函数,常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现;具体在 SQL 中就是调用 **TUMBLE()、HOP()、SESSION(),**传入时间属性字段、窗口大小等参数就可以了。

分组窗口的功能比较有限,只支持窗口聚合,所以目前已经处于弃用(deprecated)的状态。

Table result = tableEnv.sqlQuery( "SELECT user," + "TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " + "COUNT(url) AS cnt " + "FROM EventTable " + "GROUP BY user, TUMBLE(ts, INTERVAL '1' HOUR)" );

窗口表值函数(Windowing TVFs,新版本) : 从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions)来定义窗口。

在窗口 TVF 的返回值中,增加了额外 3 个列:“窗口起始点”(window_start)、“窗口结束点” ( window_end )、“窗口时间”(window_time:window_end - 1ms)。

窗口 TVF 更符合 SQL 标准,性能得到了优化,拥有更强大的功能;可以支持窗口Top-N、窗口联结(window join)等等。

# 1 滚动窗口(TUMBLE) TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR) # 2 滑动窗口(HOP)步长slide在前,窗口大小size在后。 HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS)); # 3 累积窗口(CUMULATE)累积步长 step在前,最大窗口长度在后。 # 我们的统计周期可能较长,中间每隔一段时间就输出一次当前的统计值;在一个统计周期内,我会多次输出统计值,它们应该是不断叠加累积的。 CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS)) # 累积步长 step在前,最大窗口长度在后。 # 4 会话窗口(Session Windows,目前尚未完全支持)。 四、聚合(Aggregation)查询 1. 分组聚合

?在流处理中,分组聚合同样是一个持续查询,而且是一个更新查询,得到的是一个动态表;每当流中有一个新的数据到来时,都会导致结果表的更新操作。因此,想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream)或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成 DataStream 打印输出,需要调用 toChangelogStream()。

在持续查询的过程中,由于用于分组的 key 可能会不断增加,因此计算结果所需要维护的状态也会持续增长。为了防止状态无限增长耗尽资源,Flink Table API 和 SQL 可以在表环境中配置状态的生存时间TTL,配置 TTL 有可能会导致统计结果不准确。

分组聚合中支持DISTINCT 进行去重,跟标准 SQL 中的用法一致。

// 获取表环境的配置 TableConfig tableConfig = tableEnv.getConfig(); Configuration configuration = tableConfig.getConfiguration(); // 配置状态保持时间 或者 直接设置配置项 tableConfig.setIdleStateRetention(Duration.ofMinutes(60)); configuration.setString("table.exec.state.ttl", "60 min"); 2. 窗口聚合

在流处理中,往往需要将无限数据流划分成有界数据集,这就是所谓的“窗口”。

tableEnv.sqlQuery( "SELECT " + "user, " + "window_end AS endT, " + "COUNT(url) AS cnt " + "FROM TABLE( " + "CUMULATE( TABLE EventTable, " + // 定义累积窗口 "DESCRIPTOR(ts), " + "INTERVAL '30' MINUTE, " + "INTERVAL '1' HOUR" + ")) " + "GROUP BY user, window_start, window_end" ); 3. 开窗(Over)聚合 SELECT <聚合函数> OVER ( [PARTITION BY <字段1>[, <字段2>, ...]] # 可选字段 ORDER BY <时间属性字段> # 必须字段,目前只支持按照时间属性的升序排列 <开窗范围>), ... FROM ... # 目前支持的上界只能是 CURRENT ROW,例如: RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW ROWS BETWEEN 5 PRECEDING AND CURRENT ROW 4. Top N

目前 Table API 中并不支持 ROW_NUMBER()函数,所以也只有 SQL 中这一种通用的 Top N 实现方式。

普通Top N: 针对此格式Flink排序字段优化,正常支持。

SELECT ... FROM ( SELECT ..., ROW_NUMBER() OVER ( [PARTITION BY <字段1>[, <字段1>...]] ORDER BY <排序字段1> [asc|desc][, <排序字段2> [asc|desc]...] ) AS row_num FROM ...) WHERE row_num <= N [AND <其它条件>]

窗口Top N:只有插入(INSERT)操作。

// 定义子查询,进行窗口聚合,得到包含窗口信息、用户以及访问次数的结果表 String subQuery = "SELECT window_start, window_end, user, COUNT(url) as cnt " + "FROM TABLE ( " + "TUMBLE( TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR )) " + "GROUP BY window_start, window_end, user "; // 定义Top N的外层查询 String topNQuery = "SELECT * FROM (" + "SELECT *, " + "ROW_NUMBER() OVER ( " + "PARTITION BY window_start, window_end " + "ORDER BY cnt desc " + ") AS row_num " + "FROM (" + subQuery + ")) " + "WHERE row_num <= 2"; 五、联结(Join)查询 1. 常规联结查询

与标准 SQL 一致,Flink SQL 的常规联结也可以分为内联结(INNER JOIN)和外联结(OUTER JOIN)。目前仅支持“等值条件” 作为联结条件,也就是关键字 ON 后面必须是判断两表中字段相等的逻辑表达式。常规联结查询一般是更新(Update)查询。

等值内联结(INNER Equi-JOIN)

SELECT * FROM Order INNER JOIN Product ON Order.product_id = Product.id

等值外联结(OUTER Equi-JOIN)

SELECT * FROM Order LEFT JOIN Product ON Order.product_id = Product.id SELECT * FROM Order RIGHT JOIN Product ON Order.product_id = Product.id SELECT * FROM Order FULL OUTER JOIN Product ON Order.product_id = Product.id 2. 间隔联结查询

目前 Flink SQL 还不支持窗口联结,而间隔联结则已经实现。语法如下:

两表的联结间隔联结不需要用 JOIN 关键字,直接在 FROM 后将要联结的两表列出来就可以,用逗号分隔。这与标准 SQL 中的语法一致,表示一个“交叉联结”(Cross Join),会返回两表中所有行的笛卡尔积。联结条件联结条件用 WHERE 子句来定义,用一个等值表达式描述。交叉联结之后再用 WHERE 进行条件筛选,效果跟内联结 INNER JOIN … ON …非常类似。时间间隔限制我们可以在 WHERE 子句中,联结条件后用 AND 追加一个时间间隔的限制条件;做法是提取左右两侧表中的时间字段,然后用一个表达式来指明两者需要满足的间隔限制。具体定义方式有下面三种,这里分别用 ltime 和 rtime 表示左右表中的时间字段:(1) ltime = rtime (2) ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE (3) ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

间隔联结查询只支持具有时间属性的“仅追加”(Append-only)表。其SQL形如:

SELECT * FROM Order o, Shipment s WHERE o.id = s.order_id AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time 3. 时间联结

除了间隔联结之外,Flink SQL 还支持时间联结(Temporal Join),这主要是针对“版本表”(versioned table)而言的。所谓版本表,就是记录了数据随着时间推移版本变化的表,可以理解成一个“更新日志”(change log),它就是具有时间属性、还会进行更新操作的表。当我们联结某个版本表时,并不是把当前的数据连接合并起来就行了,而是希望能够根据数据发生的时间,找到当时的“版本”;这种根据更新时间提取当时的值进行联结的操作,就叫作“时间联结”(Temporal Join)。

六、函数 1. 系统函数

系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好的功能模块。主要可以分为两大类:标量函数和聚合函数。

(1)标量函数(Scalar Functions) :标量指只有数值大小、没有方向的量。

比较函数(Comparison Functions):IS NOT NULL、<>、=;

逻辑函数(Logical Functions):boolean1 OR boolean2、boolean IS FALSE、NOT boolean;

算术函数(Arithmetic Functions):POWER(numeric1, numeric2) 、+、-;

字符串函数(String Functions):string1 || string2 连接字符串、UPPER(string)、CHAR_LENGTH(string);

时间函数(Temporal Functions):DATE string、TIMESTAMP string、CURRENT_TIME、INTERVAL string range 返回一个时间间隔[如 INTERVAL ‘2-10’ YEAR TO MONTH];

(2)聚合函数(Aggregate Functions):COUNT、SUM、RANK、ROW_NUMBER等

2. 自定义函数(UDF) 自定义函数的调用方法 // 注册临时系统函数 tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class); // 注册函数 (依赖于当前的数据库(database)和目录(catalog)) tableEnv.createTemporaryFunction ("MyFunction", MyFunction.class); //使用 Table API 调用函数 tableEnv.from("MyTable").select(call("MyFunction", $("myField"))); //Table API 中也可以未注册函数 tableEnv.from("MyTable").select(call(SubstringFunction.class, $("myField"))); //在SQL中调用函数 tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable"); 当前 UDF 主要有以下几类:

标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;

import org.apache.flink.table.functions.*; public static class HashFunction extends ScalarFunction { //抽象类中并没有定义 eval(),底层又要求eval public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) { return o.hashCode(); } } // 注册函数 tableEnv.createTemporarySystemFunction("HashFunction", HashFunction.class); // 在 SQL 里调用注册好的函数 tableEnv.sqlQuery("SELECT HashFunction(myField) FROM MyTable");

表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表;

// 注意这里的类型标注,输出是Row类型,Row中包含两个字段:word和length。 @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) public static class SplitFunction extends TableFunction<Row> { public void eval(String str) { for (String s : str.split(" ")) { // 使用collect()方法发送一行数据 collect(Row.of(s, s.length())); } } } // 注册函数 tableEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class); // 在 SQL 里调用注册好的函数 // 1. 交叉联结 tableEnv.sqlQuery( "SELECT myField, word, length " + "FROM MyTable, LATERAL TABLE(SplitFunction(myField))"); // 2. 带ON TRUE条件的左联结 tableEnv.sqlQuery( "SELECT myField, word, length " + "FROM MyTable " + "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE"); // 重命名侧向表中的字段 tableEnv.sqlQuery( "SELECT myField, newWord, newLength " + "FROM MyTable " + "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE"); // 类似于Hive的侧向视图 // FROM A lateral view explode(split(bak,",")) sss as rule_code

聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值;

// 累加器类型定义 public static class WeightedAvgAccumulator { public long sum = 0; // 加权和 public int count = 0; // 数据个数 } // 自定义聚合函数,输出为长整型的平均值,累加器类型为 WeightedAvgAccumulator public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccumulator> { @Override public WeightedAvgAccumulator createAccumulator() { return new WeightedAvgAccumulator(); // 创建累加器 } @Override public Long getValue(WeightedAvgAccumulator acc) { if (acc.count == 0) { return null; // 防止除数为0 } else { return acc.sum / acc.count; // 计算平均值并返回 } } // 累加计算方法,每来一行数据都会调用 public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) { acc.sum += iValue * iWeight; acc.count += iWeight; } } // 注册自定义聚合函数 tableEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class); // 调用函数计算加权平均值 Table result = tableEnv.sqlQuery( "SELECT student, WeightedAvg(score, weight) FROM ScoreTable GROUP BY student" );

表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一个或多个新的行数据。目前 SQL 中没有直接使用表聚合函数的方式,所以需要使用 Table API 的方式来调用:

// 聚合累加器的类型定义,包含最大的第一和第二两个数据 public static class Top2Accumulator { public Integer first; public Integer second; } // 自定义表聚合函数,查询一组数中最大的两个,返回值为(数值,排名)的二元组 public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accumulator> { @Override public Top2Accumulator createAccumulator() { Top2Accumulator acc = new Top2Accumulator(); acc.first = Integer.MIN_VALUE; // 为方便比较,初始值给最小值 acc.second = Integer.MIN_VALUE; return acc; } // 每来一个数据调用一次,判断是否更新累加器 public void accumulate(Top2Accumulator acc, Integer value) { if (value > acc.first) { acc.second = acc.first; acc.first = value; } else if (value > acc.second) { acc.second = value; } } // 输出(数值,排名)的二元组,输出两行数据 public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, Integer>> out) { if (acc.first != Integer.MIN_VALUE) { out.collect(Tuple2.of(acc.first, 1)); } if (acc.second != Integer.MIN_VALUE) { out.collect(Tuple2.of(acc.second, 2)); } } } // 注册表聚合函数函数 tableEnv.createTemporarySystemFunction("Top2", Top2.class); // 在Table API中调用函数 tableEnv.from("MyTable") .groupBy($("myField")) .flatAggregate(call("Top2", $("value")).as("value", "rank")) .select($("myField"), $("value"), $("rank")); 七、SQL 客户端 # sql-cli-defaults.yaml进行各种配置 ./bin/start-cluster.sh # 默认的启动模式是embedded,客户端是一个嵌入在本地的进程,这是目前唯一支持的模式,用于快速测试。 ./bin/sql-client.sh # 表环境的运行时模式 Flink SQL> SET 'execution.runtime-mode' = 'streaming'; # SQL 客户端的“执行结果模式” [table、changelog、tableau(虚线框表格)] Flink SQL> SET 'sql-client.execution.result-mode' = 'table'; # 执行 SQL 查询 Flink SQL> CREATE TABLE EventTable( > user STRING, > url STRING, > `timestamp` BIGINT > ) WITH ( > 'connector' = 'filesystem', > 'path' = 'events.csv', > 'format' = 'csv' > ); Flink SQL> CREATE TABLE ResultTable ( > user STRING, > cnt BIGINT > ) WITH ( > 'connector' = 'print' > ); Flink SQL> INSERT INTO ResultTable SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user; 八、连接到外部系统

在 Flink 1.13 的 API 调用中,已经不去区分 TableSource 和 TableSink。

-- 在控制台上打印 CREATE TABLE ResultTable ( user STRING, cnt BIGINT) WITH ('connector' = 'print'); 1. Kafka CREATE TABLE KafkaTable ( `user` STRING, `url` STRING, `ts` TIMESTAMP(3) METADATA FROM 'timestamp' #元数据列 Kafka中数据自带的时间戳 ) WITH ( 'connector' = 'kafka', 'topic' = 'events', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ) CREATE TABLE pageviews_per_region (user_region STRING, pv BIGINT,uv BIGINT, PRIMARY KEY (user_region) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'pageviews_per_region', 'properties.bootstrap.servers' = '...', 'key.format' = 'avro', 'value.format' = 'avro' ); 2. 文件系统 CREATE TABLE MyTable ( column_name1 INT,column_name2 STRING,... part_name1 INT, part_name2 STRING ) PARTITIONED BY (part_name1, part_name2) WITH ( 'connector' = 'filesystem', -- 连接器类型 'path' = '...', -- 文件路径 'format' = '...' -- 文件格式 ) 3. JDBC -- 创建一张连接到MySQL的表 CREATE TABLE MyTable ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'users' ); -- 将另一张表 T的数据写入到 MyTable 表中 INSERT INTO MyTable SELECT id, name, age, status FROM T; 4. Elasticsearch -- 创建一张连接到 Elasticsearch的表 定义了主键,会以更新插入(Upsert)模式向 es 写入数据 CREATE TABLE MyTable ( user_id STRING, user_name STRING uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'users' ); 5. HBase -- 创建一张连接到 HBase的表,只支持1.4.x 和 2.2.x 版本 CREATE TABLE MyTable (rowkey INT, family1 ROW<q1 INT>, family2 ROW<q2 STRING, q3 BIGINT>, family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-1.4', 'table-name' = 'mytable', 'zookeeper.quorum' = 'localhost:2181' ); -- 假设表T的字段结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6] INSERT INTO MyTable SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T; 6. Hive

Flink 与 Hive 的集成比较特别。Flink 提供了“Hive 目录”(HiveCatalog)功能,允许使用 Hive 的“元存储”(Metastore)来管理 Flink 的元数据。

?Metastore 可以作为一个持久化的目录,因此使用 HiveCatalog 可以跨会话存储 Flink 特定的元数据。在 HiveCatalog 中创建 Kafka 表或者 ElasticSearch 表,把它们的元数据持久化存储在 Hive 的 Metastore 中。Flink就可以跨作业重用定义的表了。

?使用 HiveCatalog,Flink 可以作为读写 Hive 表的替代分析引擎。

连接到 Hive

// 1 在API中配置连接到Hive EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); String name = "myhive"; String defaultDatabase = "mydatabase"; String hiveConfDir = "/opt/hive-conf"; // 创建一个HiveCatalog,并在表环境中注册 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); tableEnv.registerCatalog("myhive", hive); // 使用HiveCatalog作为当前会话的catalog tableEnv.useCatalog("myhive"); # 2 在SQL Cli连接到Hive Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf'); [INFO] Execute statement succeed. Flink SQL> use catalog myhive; [INFO] Execute statement succeed.

设置 SQL 方言

// sql中 set table.sql-dialect=hive; // sql-cli-defaults.yaml中 configuration: table.sql-dialect: hive // Table API配置hive方言 默认SqlDialect.DEFAULT,Flink方言 tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

读写 Hive 表

-- 设置SQL方言为hive,创建Hive表 SET table.sql-dialect=hive; CREATE TABLE hive_table (user_id STRING, order_amount DOUBLE ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' ); -- 设置SQL方言为default,创建Kafka表 SET table.sql-dialect=default; CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND – 定义水位线 ) WITH (...); -- 将Kafka中读取的数据经转换后写入Hive INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #SQL #教程 #TABLE #API # #udf