irpas技术客

Flink CDC同步 MySQL 分库分表分片数据_shy_snow

大大的周 7144

mysql-cdc支撑正则表达式的库名表名来匹配多个库多个表来获取分库分表情况下的mysql数据。只需要在创建flink源表时在数据库和表名上使用正则匹配即可。

?建表语句:

DROP TABLE IF EXISTS `2person`; CREATE TABLE `2person` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(20) NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2003 DEFAULT CHARSET=utf8; -- ---------------------------- -- Records of 1person -- ---------------------------- INSERT INTO `2person` VALUES ('2001', '2001name'); INSERT INTO `2person` VALUES ('2002', '2name'); DROP TABLE IF EXISTS `3person`; CREATE TABLE `3person` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(20) NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3003 DEFAULT CHARSET=utf8; -- ---------------------------- -- Records of 1person -- ---------------------------- INSERT INTO `3person` VALUES ('3001', '3001name'); INSERT INTO `3person` VALUES ('3002', '3name'); CREATE TABLE `person_sum` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(20) NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3003 DEFAULT CHARSET=utf8;

java调用sql(也可以直接在flinksql客户端执行其中的sql):?

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Mysql2MysqlRemote { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String sourceDDL = "CREATE TABLE mysql_binlog (\n" + " id Int,\n" + " name STRING,\n" + " primary key (id) not enforced\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = '192.168.128.1',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + " 'database-name' = 'db_[0-9]?',\n" + " 'table-name' = '[0-9]?persion[0-9]?'\n" + // ", 'scan.startup.mode' = 'latest-offset'\n" + ")"; String sinkDDL = "CREATE TABLE test_cdc (" + " id Int," + " name STRING," + " primary key (id) not enforced" + ") WITH (" + " 'connector' = 'jdbc'," + " 'driver' = 'com.mysql.cj.jdbc.Driver'," + " 'url' = 'jdbc:mysql://192.168.128.1:3306/db0?serverTimezone=UTC&useSSL=false'," + " 'username' = 'root'," + " 'password' = '123456'," + " 'table-name' = 'person_sum'" + ")"; // 简单的聚合处理 String transformDmlSQL = "insert into test_cdc select * from mysql_binlog"; System.out.println(sourceDDL); System.out.println(sinkDDL); System.out.println(transformDmlSQL); TableResult tableResult = tableEnv.executeSql(sourceDDL); TableResult sinkResult = tableEnv.executeSql(sinkDDL); TableResult result = tableEnv.executeSql(transformDmlSQL); // 等待flink-cdc完成快照 result.print(); env.execute("Mysql2MysqlRemote"); } }

?pom.xml :

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://·piler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.13.3</flink.version> <hive.version>1.1.0</hive.version> <scala.binary.version>2.12</scala.binary.version> <mysql.version>5.1.49</mysql.version> <flinkcdc.version>2.0.1</flinkcdc.version> <fastjson.version>1.2.75</fastjson.version> <slf4j.version>1.7.25</slf4j.version> <log4j.version>2.16.0</log4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>${flinkcdc.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <!-- jdbc --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.22</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>${slf4j.version}</version> <scope>compile</scope> </dependency> </dependencies> <build> <sourceDirectory>src/main/java/</sourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> <verbose>true</verbose> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.0</version> <configuration> <includes> <include>**/*.java</include> </includes> <skipTests>true</skipTests> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>

执行后,flink会启动任务将存量数据同步到目标表,并且如果增量修改数据也会被同步过去,可以修改源表数据后再查看目标表中的数据库是否变化。

其他问题:

如果各表中的主键有相同的可以通过拼接数据库名和表名来组成联合主键。

在源表建表语句中中增加 ? ? ? ? ? ? ? ? ? ? ? ? database_name STRING METADATA VIRTUAL, ? ? ? ? ? ? ? ? ? ? ? ? table_name STRING METADATA VIRTUAL,

在目标表建表语句中增加

database_name STRING,

table_name STRING,

并设置联合主键

PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED

如果分库不在一个机器上,可以使用union all 来解决,这个效率会低一些。

cd $FLINK_HOME export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` bin/sql-client.sh embedded -j ./lib/hudi-flink-bundle_2.12-0.10.0.jar shell CREATE TABLE mysql_users ( id BIGINT PRIMARY KEY NOT ENFORCED , name STRING, birthday TIMESTAMP(3), ts TIMESTAMP(3) ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.128.131', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'server-time-zone' = 'Asia/Shanghai', 'database-name' = 'db1', 'table-name' = 'users' ); CREATE TABLE mysql_users2 ( id BIGINT PRIMARY KEY NOT ENFORCED , name STRING, birthday TIMESTAMP(3), ts TIMESTAMP(3) ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.129.102', 'port' = '3306', 'username' = 'cdc', 'password' = 'cdc', 'server-time-zone' = 'Asia/Shanghai', 'database-name' = 'cdc', 'table-name' = 'users2' ); CREATE TABLE hudi_users ( id BIGINT PRIMARY KEY NOT ENFORCED, name STRING, birthday TIMESTAMP(3), ts TIMESTAMP(3) ) WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'path' = 'hdfs://192.168.129.102:8020/hudi/hudi_users', 'read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '3' , 'is_generic' = 'false' ); SET execution.checkpointing.interval = 60s; insert into hudi_users select * from mysql_users union all select * from mysql_users2;

参考:

基于 Flink CDC 同步 MySQL 分库分表构建实时数据湖 — Flink CDC documentation

Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖 - 尚码园


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #CDC同步 #MySQL #分库分表分片数据 #建表语句DROP #TABLE