irpas技术客

【Flink1.14实战】Docker环境Flink Sql DataGen 快速开始_吕布辕门_datagen flink

网络 1348

DataGen SQL 连接器

DataGen 连接器允许按数据生成规则进行读取。

DataGen 连接器可以使用计算列语法。 这使您可以灵活地生成记录。

DataGen 连接器是内置的。

注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。

创建一个 DataGen 的表

表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。

每个列,都有两种生成数据的方法:

随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、string (类型)可以指定长度。它是无界的生成器。序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。 CREATE TABLE datagen ( f_sequence INT, f_random INT, f_random_str STRING, ts AS localtimestamp, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'datagen', -- optional options -- 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ) 连接器参数 参数是否必选默认值数据类型描述connector必须(none)String指定要使用的连接器,这里是 ‘datagen’。rows-per-second可选10000Long每秒生成的行数,用以控制数据发出速率。fields.#.kind可选randomString指定 ‘#’ 字段的生成器。可以是 ‘sequence’ 或 ‘random’。fields.#.min可选(Minimum value of type)(Type of field)随机生成器的最小值,适用于数字类型。fields.#.max可选(Maximum value of type)(Type of field)随机生成器的最大值,适用于数字类型。fields.#.length可选100Integer随机生成器生成字符的长度,适用于 char、varchar、string。fields.#.start可选(none)(Type of field)序列生成器的起始值。fields.#.end可选(none)(Type of field)序列生成器的结束值。
实战

基于docker-compose。

1、编辑 docker-compose.yml

version: "3" services: jobmanager: image: flink:1.14.4-scala_2.11 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.14.4-scala_2.11 depends_on: - jobmanager command: taskmanager scale: 1 environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 4

2、创建执行程序

package quick.table; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.*; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class TableExample { public static void main(String[] args) throws Exception { String sql="CREATE TABLE source_table (\n" + " user_id INT,\n" + " cost DOUBLE,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='5',\n" + "\n" + " 'fields.user_id.kind'='random',\n" + " 'fields.user_id.min'='1',\n" + " 'fields.user_id.max'='10',\n" + "\n" + " 'fields.cost.kind'='random',\n" + " 'fields.cost.min'='1',\n" + " 'fields.cost.max'='100'\n" + ")\n"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql(sql); // 执行查询 Table table = tableEnv.sqlQuery("select * from source_table"); DataStream<Row> resultStream = tableEnv.toDataStream(table); // add a printing sink and execute in DataStream API resultStream.print(); env.execute(); } }

3、启动服务

$ docker-compose up -d

4、打印结果

然后,将打包应用程序提交,Flink 的Web UI来提交作业监控集群的状态和正在运行的作业。

$ docker-compose logs -f taskmanager taskmanager_1 | +I[4, 60.06509260823151, 2022-04-13T11:01:30.349] taskmanager_1 | +I[9, 22.444427031038334, 2022-04-13T11:01:30.349] ......

源码已提交


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #datagen #Flink #SQL #连接器DataGen #连接器可以使用计算列语法