irpas技术客

flink cdc 整合 数据湖hudi 同步 hive_wudl5566_hudi同步到hive

网络投稿 2749

1. 版本说明

组件版本hudi10.0flink13.5hive3.1.0
2. 实现效果 通过flink cdc 整合 hudi 到hive

flink cdc 讲解 flink cdc 1.2实例 flink cdc 2.0 实例

3.flink 需要的jar 包

需要的包:flink-connector-mysql-cdc-2.0.2.jar

-rw-r--r-- 1 root root 7802399 2月 16 00:36 doris-flink-1.0-SNAPSHOT.jar -rw-r--r-- 1 root root 249571 2月 16 00:36 flink-connector-jdbc_2.12-1.13.5.jar -rw-r--r-- 1 root root 359138 2月 16 00:36 flink-connector-kafka_2.12-1.13.5.jar -rw-r--r-- 1 root root 30087268 2月 17 22:12 flink-connector-mysql-cdc-2.0.2.jar -rw-r--r-- 1 root root 92315 2月 16 00:36 flink-csv-1.13.5.jar -rw-r--r-- 1 root root 106535830 2月 16 00:36 flink-dist_2.12-1.13.5.jar -rw-r--r-- 1 root root 148127 2月 16 00:36 flink-json-1.13.5.jar -rw-r--r-- 1 root root 43317025 2月 16 00:36 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar -rw-r--r-- 1 root root 7709740 2月 16 00:36 flink-shaded-zookeeper-3.4.14.jar -rw-r--r-- 1 root root 3674116 2月 16 00:36 flink-sql-connector-kafka_2.12-1.13.5.jar -rw-r--r-- 1 root root 35051557 2月 16 00:35 flink-table_2.12-1.13.5.jar -rw-r--r-- 1 root root 38613344 2月 16 00:36 flink-table-blink_2.12-1.13.5.jar -rw-r--r-- 1 root root 62447468 2月 16 00:36 hudi-flink-bundle_2.12-0.10.0.jar -rw-r--r-- 1 root root 17276348 2月 16 00:36 hudi-hadoop-mr-bundle-0.10.0.jar -rw-r--r-- 1 root root 207909 2月 16 00:36 log4j-1.2-api-2.16.0.jar -rw-r--r-- 1 root root 301892 2月 16 00:36 log4j-api-2.16.0.jar -rw-r--r-- 1 root root 1789565 2月 16 00:36 log4j-core-2.16.0.jar -rw-r--r-- 1 root root 24258 2月 16 00:36 log4j-slf4j-impl-2.16.0.jar -rw-r--r-- 1 root root 724213 2月 16 00:36 mysql-connector-java-5.1.9.jar [root@node01 lib]# pwd /opt/module/flink/flink-1.13.5/lib [root@node01 lib]# 4. 实现功能场景

5. 实现步骤 1.创建数据库表,并且配置binlog 文件 2.在flinksql 中创建flink cdc 表 3.创建视图 4.创建输出表,关联Hudi表,并且自动同步到Hive表 5.查询视图数据,插入到输出表 -- flink 后台实时执行 5.1 开启mysql binlog server-id=162 log-bin=mysql-bin #sync-binlog=1 # 指定不同步的库 binlog-ignore-db=information_schema binlog-ignore-db=performance_schema binlog-ignore-db=sys binlog-ignore-db=mysql binlog_format=ROW expire_logs_days=30 binlog_row_image=full #指定同步的库 #binlog-do-db=test

重启mysql service mysqld restart

5.2 创建mysql 表 CREATE TABLE `Flink_cdc` ( `id` BIGINT(64) AUTO_INCREMENT PRIMARY KEY, `name` VARCHAR(64) NULL, `age` INT(20) NULL, birthday TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ) ; INSERT INTO `wudldb`.`Flink_cdc`(NAME,age) VALUES("flink",18) ; 5.3 在flinksql 中 创建flinkcdc 表 Flink SQL> CREATE TABLE source_mysql ( id BIGINT PRIMARY KEY NOT ENFORCED, name STRING, age INT, birthday TIMESTAMP(3), ts TIMESTAMP(3) ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.1.162', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'server-time-zone' = 'Asia/Shanghai', 'debezium.snapshot.mode' = 'initial', 'database-name' = 'wudldb', 'table-name' = 'Flink_cdc' ); [INFO] Execute statement succeed. 5.4 创建flinksql 中的 flinkcdc 视图 Flink SQL> create view view_source_flinkcdc_mysql > AS > SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM source_mysql; [INFO] Execute statement succeed. 5.5 创建输出表,关联Hudi表,并且自动同步到Hive表 Flink SQL> CREATE TABLE flink_cdc_sink_hudi_hive( > id bigint , > name string, > age int, > birthday TIMESTAMP(3), > ts TIMESTAMP(3), > part VARCHAR(20), > primary key(id) not enforced > ) > PARTITIONED BY (part) > with( > 'connector'='hudi', > 'path'= 'hdfs://192.168.1.161:8020/flink_cdc_sink_hudi_hive', > 'table.type'= 'MERGE_ON_READ', > 'hoodie.datasource.write.recordkey.field'= 'id', > 'write.precombine.field'= 'ts', > 'write.tasks'= '1', > 'write.rate.limit'= '2000', > 'compaction.tasks'= '1', > 'compaction.async.enabled'= 'true', > 'compaction.trigger.strategy'= 'num_commits', > 'compaction.delta_commits'= '1', > 'changelog.enabled'= 'true', > 'read.streaming.enabled'= 'true', > 'read.streaming.check-interval'= '3', > 'hive_sync.enable'= 'true', > 'hive_sync.mode'= 'hms', > 'hive_sync.metastore.uris'= 'thrift://node02.com:9083', > 'hive_sync.jdbc_url'= 'jdbc:hive2://node02.com:10000', > 'hive_sync.table'= 'flink_cdc_sink_hudi_hive', > 'hive_sync.db'= 'db_hive', > 'hive_sync.username'= 'root', > 'hive_sync.password'= '123456', > 'hive_sync.support_timestamp'= 'true' > ); [INFO] Execute statement succeed. 5.6 . 查询视图数据,插入到输出表 Flink SQL> INSERT INTO flink_cdc_sink_hudi_hive SELECT id, name,age,birthday, ts, part FROM view_source_flinkcdc_mysql ; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: c618c9f528b9793adf4418640bb2a0fc 5.7 查看flink 运行job

6.hudi 与hive 整合

将hudi hudi-hadoop-mr-bundle-0.10.0.jar 拷贝到hive的lib 目录下面 , 重启hive 服务

6.1 连接hive 查看hudi 同步到hive 中的表 0: jdbc:hive2://node01.com:2181,node02.com:21> show tables; INFO : Compiling command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f): show tables INFO : Semantic Analysis Completed (retrial = false) INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from deserializer)], properties:null) INFO : Completed compiling command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f); Time taken: 0.016 seconds INFO : Executing command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f): show tables INFO : Starting task [Stage-0:DDL] in serial mode INFO : Completed executing command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f); Time taken: 0.012 seconds INFO : OK +------------------------------+ | tab_name | +------------------------------+ | flink_cdc_sink_hudi_hive_ro | | flink_cdc_sink_hudi_hive_rt | +------------------------------+ hive 的两张表 ro类型是读优化查询 , rt 类型快照查询 6.1 查询 0: jdbc:hive2://node01.com:2181,node02.com:21> select id ,name , age , birthday from flink_cdc_sink_hudi_hive_ro; INFO : Compiling command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413): select id ,name , age , birthday from flink_cdc_sink_hudi_hive_ro INFO : Semantic Analysis Completed (retrial = false) INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:id, type:bigint, comment:null), FieldSchema(name:name, type:string, comment:null), FieldSchema(name:age, type:int, comment:null), FieldSchema(name:birthday, type:bigint, comment:null)], properties:null) INFO : Completed compiling command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413); Time taken: 0.124 seconds INFO : Executing command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413): select id ,name , age , birthday from flink_cdc_sink_hudi_hive_ro INFO : Completed executing command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413); Time taken: 0.029 seconds INFO : OK +-----+--------+------+----------------+ | id | name | age | birthday | +-----+--------+------+----------------+ | 1 | flink | 18 | 1645142397000 | +-----+--------+------+----------------+ 1 row selected (0.278 seconds) 0: jdbc:hive2://node01.com:2181,node02.com:21>

整体效果

错误 中途遇到一个错误

flinkcdc 需要的 flink-connector-mysql-cdc-2.0.2.jar 而不是 flink-sql-connector-mysql-cdc-2.0.2.jar 这个包 否在会遇到一下错误:

Flink SQL> select * from users_source_mysql; Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/connect/data/Schema at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) at java.lang.Class.getDeclaredMethod(Class.java:2128) at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629) at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79) at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520) at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobVertex(StreamingJobGraphGenerator.java:597) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:457) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:378) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:179) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:117) at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:934) at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50) at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39) at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56) at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:67) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957) at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:795) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225) at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:213) at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:213) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235) at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479) at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412) at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327) at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297) at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) ... 1 more Caused by: java.lang.ClassNotFoundException: org.apache.kafka.connect.data.Schema at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 69 more Shutting down the session... done. [root@node01 bin]#


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #hudi同步到hive #1 #实现效果 #通过flink #CDC #整合 #hudi