irpas技术客

Flink Table和SQL的表和视图、Connectors和timestamp数据类型_Bulut0907_flink sql 视图

网络投稿 1233

目录 1. 表和视图2. Table API Connectors2.1 filesystem、print、blackhole 3. timestamp和timestamp_ltz

1. 表和视图

表分为临时表和永久表,相同名称下,临时表的优先级比永久表高 永久表需要数据库保存元数据,例如Hive数据库

连接外部数据系统通常用createTemporaryTable,中间结果表通常用createTemporatyView,如下所示:

tEnv.createTemporaryTable("table_name", tableDescriptor) tEnv.createTemporaryView("table_name", table) 2. Table API Connectors 2.1 filesystem、print、blackhole

添加pom.xml依赖

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.14.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.1</version> <scope>provided</scope> </dependency>

程序如下:

import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.streaming.api.functions.sink.DiscardingSink import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api.{DataTypes, FormatDescriptor, Schema, TableDescriptor, long2Literal, row, string2Literal} import org.apache.flink.types.Row import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.ipc.StandbyException import scala.util.control.Breaks.{break, breakable} object flink_test { // 获取Active HDFS Uri def getActiveHdfsUri() = { val hadoopConf = new Configuration() val hdfsUris = Array( "hdfs://192.168.23.101:8020", "hdfs://192.168.23.102:8020", "hdfs://192.168.23.103:8020" ) var hdfsCli: FileSystem = null var hdfsCapacity: Long = -1L var activeHdfsUri: String = null breakable { for (hdfsUri <- hdfsUris) { hadoopConf.set("fs.defaultFS", hdfsUri) hdfsCli = FileSystem.get(hadoopConf) try { hdfsCapacity = hdfsCli.getStatus.getCapacity activeHdfsUri = hdfsUri break } catch { case hdfsException: StandbyException => {} } } } activeHdfsUri } def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setRuntimeMode(RuntimeExecutionMode.STREAMING) val tEnv = StreamTableEnvironment.create(senv) val hdfsFilePath = s"${getActiveHdfsUri()}/test/test.txt" // HDFS表 val fileSystemTable = tEnv.from( TableDescriptor.forConnector("filesystem") .schema(Schema.newBuilder() .column("name", DataTypes.STRING()) .column("amount", DataTypes.BIGINT()) .build() ) .option("path", hdfsFilePath) .format(FormatDescriptor .forFormat("csv") .option("field-delimiter", ",") .build() ).build() ) tEnv.createTemporaryView("fileSystemTable", fileSystemTable) // print表 tEnv.createTemporaryTable("printSink", TableDescriptor.forConnector("print") .schema(Schema.newBuilder() .column("name", DataTypes.STRING()) .column("amount", DataTypes.BIGINT()) .build() ).build() ) // 读取HDFS表数据用print输出, 输出结果和转换成DataStream进行print一样 fileSystemTable.executeInsert("printSink") // blackhole表 tEnv.executeSql("create temporary table blackholeSink with ('connector' = 'blackhole') like printSink") // 读取HDFS表数据到blackhole tEnv.executeSql("insert into blackholeSink select * from fileSystemTable") // 转换为DataStream, 输出到blackhole val fileSystemDatastream = tEnv.toDataStream(fileSystemTable) fileSystemDatastream.addSink(new DiscardingSink[Row]()) senv.execute() } }

执行结果如下:

6> +I[zhang_san, 30] 4> +I[li_si, 40] 3. timestamp和timestamp_ltz timestamp(p) p指小数秒的精度,范围为0-9,默认是6 val table = tEnv.sqlQuery("select timestamp '1970-01-01 00:00:04.001'") table.execute().print()

输出如下:

+----+-------------------------+ | op | EXPR$0 | +----+-------------------------+ | +I | 1970-01-01 00:00:04.001 | +----+-------------------------+ timestamp_ltz(p) 用于描述时间线上的绝对时间点, 使用long保存从epoch至今的毫秒数,使用int保存毫秒中的纳秒数 无法通过字符串来指定, 可以通过一个long类型的epoch时间来转化。在同一个时间点, 全世界所有的机器上执行System.currentTimeMillis()都会返回同样的值 tEnv.executeSql("create view t1 as select to_timestamp_ltz(4001, 3)") val table = tEnv.sqlQuery("select * from t1") table.execute().print()

输出如下:

+----+-------------------------+ | op | EXPR$0 | +----+-------------------------+ | +I | 1970-01-01 08:00:04.001 | +----+-------------------------+ 各种当前时间函数 tEnv.executeSql("create view myView1 as select localtime, localtimestamp, current_date, current_time, current_timestamp, current_row_timestamp(), now(), proctime()") val table = tEnv.sqlQuery("select * from myView1") table.printSchema() table.execute().print()

输出如下:

( `localtime` TIME(0) NOT NULL, `localtimestamp` TIMESTAMP(3) NOT NULL, `current_date` DATE NOT NULL, `current_time` TIME(0) NOT NULL, `current_timestamp` TIMESTAMP_LTZ(3) NOT NULL, `EXPR$5` TIMESTAMP_LTZ(3) NOT NULL, `EXPR$6` TIMESTAMP_LTZ(3) NOT NULL, `EXPR$7` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* ) +----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ | op | localtime | localtimestamp | current_date | current_time | current_timestamp | EXPR$5 | EXPR$6 | EXPR$7 | +----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ | +I | 12:59:06 | 2022-02-07 12:59:06.859 | 2022-02-07 | 12:59:06 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.862 | +----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #SQL #视图 #目录1 #catalog表和视图2 #触发计算的API3 #查询优化1