Flink CDC 系列文章: 《Flink CDC 系列(1)—— 什么是 Flink CDC》 《Flink CDC 系列(2)—— Flink CDC 源码编译》 《Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo》 《Flink CDC 系列(4)—— Flink CDC MySQL Connector 常用参数表》 《Flink CDC 系列(5)—— Flink CDC MySQL Connector 启动模式》 《Flink CDC 系列(6)—— Flink CDC MySQL Connector 工作机制之 Incremental Snapshot Reading》 《Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch》
文章目录 简介系统环境和软件版本MySQL 测试数据准备ElasticSearch 安装1. 安装包选择和下载2. 解压3. 启动 ElasticeSearch4. 验证是否启动成功 Flink 集群准备演示开始总结 简介本文介绍了通过 Flink CDC + Flink SQL 同步 MySQL 数据到 ElasticSearch 的案例。案例包含了 Insert/Update/Delete 的操作。
系统环境和软件版本Ubuntu 20.04 JDK 1.8 Maven 3.6.3 Flink 1.13.6 ElasticSearch 7.16.2
MySQL 测试数据准备 mysql> CREATE DATABASE mydb; mysql> USE mydb; mysql> CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) ); mysql> INSERT INTO products VALUES (default,"scooter1","Small 1-wheel scooter"); Query OK, 1 row affected (0.01 sec) ElasticSearch 安装 1. 安装包选择和下载官网下载地址: https://`work.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b 8875 StandaloneSessionClusterEntrypoint --configDir /opt/flink-1.13.6/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=469762048b -D jobmanager.memory.jvm-overhead.max=201326592b 9403 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b 9727 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b ubuntu@ubuntu:/opt/flink-1.13.6$ 演示开始
1. 启动 Flink SQL Client
cd /opt/flink-1.13.6 bin/sql-client.sh2. 在 Flink SQL Client 中执行 DDL 和 查询
-- 创建 mysql-cdc source Flink SQL> CREATE TABLE products ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.64.6', 'port' = '3306', 'username' = 'test', 'password' = 'test', 'database-name' = 'mydb', 'table-name' = 'products' ); [INFO] Execute statement succeed. Flink SQL> select * from products; id name description 1 scooter1 Small 1-wheel scooter Flink SQL> CREATE TABLE products_es_sink ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'products' ); [INFO] Execute statement succeed. Flink SQL> insert into products_es_sink select * from products; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: b962baa7f6a8890cc45e43a7c95765d23. 查看 Elasticearch Index 的数据
curl http://localhost:9200/products/_search?pretty { "took" : 3, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "products", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "id" : 1, "name" : "scooter1", "description" : "Small 1-wheel scooter" } } ] } }4. 在Mysql客户端插入新的数据
mysql> INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter");5. 查看 Elasticearch Index 的数据 在命令行执行:
curl http://localhost:9200/products/_search?pretty { "took" : 433, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "products", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "id" : 1, "name" : "scooter1", "description" : "Small 1-wheel scooter" } }, { "_index" : "products", "_type" : "_doc", "_id" : "2", "_score" : 1.0, "_source" : { "id" : 2, "name" : "scooter2", "description" : "Small 2-wheel scooter" } } ] } } -- 新数据写到了elasticsearch6. 在Mysql客户端更新的数据
mysql> update products set name = 'scooter----1' where id = 1;7. 查看 Elasticearch Index 的数据 在命令行执行:
curl http://localhost:9200/products/_search?pretty { "took" : 154, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "products", "_type" : "_doc", "_id" : "2", "_score" : 1.0, "_source" : { "id" : 2, "name" : "scooter2", "description" : "Small 2-wheel scooter" } }, { "_index" : "products", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "id" : 1, "name" : "scooter----1", "description" : "Small 1-wheel scooter" } } ] } } -- id=1的数据被更新到了elasticsearch7. 在Mysql客户端删除的数据
mysql> delete from products where id = 1;8. 查看 Elasticearch Index 的数据
在命令行执行:
curl http://localhost:9200/products/_search?pretty { "took" : 347, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "products", "_type" : "_doc", "_id" : "2", "_score" : 1.0, "_source" : { "id" : 2, "name" : "scooter2", "description" : "Small 2-wheel scooter" } } ] } } -- id=1的数据被删除 总结通过 Flink CDC 可以捕获到 MySQL 的 insert/update/delete 操作日志,并通过 Flink SQL 可对 ElasticSearch 的索引数据进行 insert/update/delete。
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。 |