irpas技术客

Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo_白月蓝山

irpas 1031

Flink CDC 系列文章: 《Flink CDC 系列(1)—— 什么是 Flink CDC》 《Flink CDC 系列(2)—— Flink CDC 源码编译》 《Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo》 《Flink CDC 系列(4)—— Flink CDC MySQL Connector 常用参数表》 《Flink CDC 系列(5)—— Flink CDC MySQL Connector 启动模式》 《Flink CDC 系列(6)—— Flink CDC MySQL Connector 工作机制之 Incremental Snapshot Reading》 《Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch》

文章目录 系统环境MySQL 测试数据准备Flink CDC 源码编译Flink 集群准备演示开始1. 启动 Flink SQL Client2. 在 Flink SQL Client 中执行 DDL 和 查询3. 在 MySQL 客户端继续插入数据4. Flink SQL Client 观察数据变化5. 在 MySQL 客户端更新数据6. Flink SQL Client 查看数据是否有数据更新6. 在 MySQL 客户端删除数据7. 在 Flink SQL Client 客户端查看数据是否被删除

系统环境

Ubuntu 20.04 JDK 1.8 Maven 3.6.3

MySQL 测试数据准备 mysql> CREATE DATABASE mydb; mysql> USE mydb; mysql> CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) ); mysql> ALTER TABLE products AUTO_INCREMENT = 101; mysql> INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","water resistent black wind breaker"), (default,"spare tire","24 inch spare tire"); Flink CDC 源码编译

参考文章《Flink CDC 系列(2)—— Flink CDC 源码编译》 编译产生的 Jar 文件在后面的 Flink 集群准备 需要用到。

Flink 集群准备 ## 下载 flink 1.13.6 的二进制安装包 axel -n 20 https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz ## 解压 tar xvf flink-1.13.6-bin-scala_2.11.tgz ## 将 flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 拷贝到 flink lib 目录下,该文件由 Flink CDC 源码编译得到 cp /opt/flink-cdc-connectors/flink-sql-connector-mysql-cdc/target/flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar /opt/flink-1.13.6/lib ## 启动单机集群 cd flink-1.13.6 bin/start-cluster.sh ## 查看 jobmanager 和 taskmanager 的进程是否存活 jps -m ## 正常情况会出现两个进程,如下: $ jps -m 67440 StandaloneSessionClusterEntrypoint --configDir /opt/flink-1.13.6/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b 68054 Jps -m 67705 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=134217730b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=134217730b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=536870920b -D taskmanager.memory.task.heap.size=402653174b -D taskmanager.numberOfTaskSlots=1 -D taskmanager.memory.jvm-overhead.max=201326592b 演示开始

建议启动两个命令行窗口,一个运行 Flink SQL Client , 另一个运行 MySQL Client。

1. 启动 Flink SQL Client cd /opt/flink-1.13.6 bin/sql-client.sh 2. 在 Flink SQL Client 中执行 DDL 和 查询 Flink SQL> SET execution.checkpointing.interval = 3s Flink SQL> CREATE TABLE products ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.64.6', 'port' = '3306', 'username' = 'test', 'password' = 'test', 'database-name' = 'mydb', 'table-name' = 'products' ); Flink SQL> select count(1) from products; -- 结果为9 -- 不退出,继续下一步 3. 在 MySQL 客户端继续插入数据 mysql> INSERT INTO products VALUES (default,"scooter1","Small 2-wheel scooter"); INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter"); INSERT INTO products VALUES (default,"scooter3","Small 2-wheel scooter"); INSERT INTO products VALUES (default,"scooter4","Small 2-wheel scooter"); 4. Flink SQL Client 观察数据变化

观察 Flink SQL Client 窗口的数值变化,此时数值应为 13。

5. 在 MySQL 客户端更新数据 mysql> update products set name = 'scooter0001' where id = 101; 6. Flink SQL Client 查看数据是否有数据更新 Flink SQL> select * from products;

可以看到 id=101 的数据已经更新了。

6. 在 MySQL 客户端删除数据 mysql> delete from products where id = 101; 7. 在 Flink SQL Client 客户端查看数据是否被删除 Flink SQL> select * from products;

可以看到 id=101 的数据已经被删除了。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #CDC #系列3 #MySQL #Connector #