DataGen SQL 连接器
DataGen 连接器允许按数据生成规则进行读取。
DataGen 连接器可以使用计算列语法。 这使您可以灵活地生成记录。
DataGen 连接器是内置的。
注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。
创建一个 DataGen 的表表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。
每个列,都有两种生成数据的方法:
随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、string (类型)可以指定长度。它是无界的生成器。序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。 CREATE TABLE datagen ( f_sequence INT, f_random INT, f_random_str STRING, ts AS localtimestamp, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'datagen', -- optional options -- 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ) 连接器参数基于docker-compose。
1、编辑 docker-compose.yml
version: "3" services: jobmanager: image: flink:1.14.4-scala_2.11 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.14.4-scala_2.11 depends_on: - jobmanager command: taskmanager scale: 1 environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 42、创建执行程序
package quick.table; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.*; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class TableExample { public static void main(String[] args) throws Exception { String sql="CREATE TABLE source_table (\n" + " user_id INT,\n" + " cost DOUBLE,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='5',\n" + "\n" + " 'fields.user_id.kind'='random',\n" + " 'fields.user_id.min'='1',\n" + " 'fields.user_id.max'='10',\n" + "\n" + " 'fields.cost.kind'='random',\n" + " 'fields.cost.min'='1',\n" + " 'fields.cost.max'='100'\n" + ")\n"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql(sql); // 执行查询 Table table = tableEnv.sqlQuery("select * from source_table"); DataStream<Row> resultStream = tableEnv.toDataStream(table); // add a printing sink and execute in DataStream API resultStream.print(); env.execute(); } }3、启动服务
$ docker-compose up -d4、打印结果
然后,将打包应用程序提交,Flink 的Web UI来提交作业监控集群的状态和正在运行的作业。
$ docker-compose logs -f taskmanager taskmanager_1 | +I[4, 60.06509260823151, 2022-04-13T11:01:30.349] taskmanager_1 | +I[9, 22.444427031038334, 2022-04-13T11:01:30.349] ......源码已提交
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。 |
标签: #datagen #Flink #SQL #连接器DataGen #连接器可以使用计算列语法