irpas技术客

FlinkSQL CDC实现同步oracle数据到mysql_雾岛与鲸_flink同步oracle

网络投稿 7033

环境准备 1、flink 1.13.0 2、oracle 11g 3、flink-connector-oracle-cdc 2.1.0

1、oracle环境配置

首先需要安装oracle环境,参考 https://blog.csdn.net/qq_36039236/article/details/124224500?spm=1001.2014.3001.5502

进入容器进行配置:

docker exec -it oracle11 bash # 切换到oracle用户 su - oracle # 创建数据需要的目录,需要提前创建,否则报错目录不存在 mkdir /home/oracle/oracle-data-test sqlplus /nolog # 以 DBA 身份连接到数据库 SQL> conn /as sysdba

接下来进行相关重要的配置:

-- 启用日志归档 alter system set db_recovery_file_dest_size = 10G; alter system set db_recovery_file_dest = '/home/oracle/oracle-data-test' scope=spfile; shutdown immediate; startup mount; alter database archivelog; alter database open; -- 检查日志归档是否开启 archive log list; -- 为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置。 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; -- 创建表空间 CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; -- 创建用户family绑定表空间LOGMINER_TBS CREATE USER family IDENTIFIED BY zyhcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; -- 授予family用户dba的权限 grant connect,resource,dba to family; -- 并授予权限 GRANT CREATE SESSION TO family; GRANT SELECT ON V_$DATABASE to family; GRANT FLASHBACK ANY TABLE TO family; GRANT SELECT ANY TABLE TO family; GRANT SELECT_CATALOG_ROLE TO family; GRANT EXECUTE_CATALOG_ROLE TO family; GRANT SELECT ANY TRANSACTION TO family; GRANT EXECUTE ON SYS.DBMS_LOGMNR TO family; GRANT SELECT ON V_$LOGMNR_CONTENTS TO family; GRANT CREATE TABLE TO family; GRANT LOCK ANY TABLE TO family; GRANT ALTER ANY TABLE TO family; GRANT CREATE SEQUENCE TO family; GRANT EXECUTE ON DBMS_LOGMNR TO family; GRANT EXECUTE ON DBMS_LOGMNR_D TO family; GRANT SELECT ON V_$LOG TO family; GRANT SELECT ON V_$LOG_HISTORY TO family; GRANT SELECT ON V_$LOGMNR_LOGS TO family; GRANT SELECT ON V_$LOGMNR_CONTENTS TO family; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO family; GRANT SELECT ON V_$LOGFILE TO family; GRANT SELECT ON V_$ARCHIVED_LOG TO family; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO family;

本地使用Navcat连接oracle:

-- 创建 STUDENT_INFO 表 create table student_info ( sid number(10) constraint pk_sid primary key, sname varchar2(10), sex varchar2(2) ); -- 修改STUDENT_INFO表让其支持增量日志,这句先在Oracle里创建user表再执行 ALTER TABLE FAMILY.STUDENT_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

2、flink cdc程序开发

maven依赖:

<properties> <java.version>1.8</java.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <flink.version>1.13.0</flink.version> <scala.version>2.12</scala.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-oracle-cdc</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>com.oracle.database.jdbc</groupId> <artifactId>ojdbc10</artifactId> <version>19.10.0.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> </dependency> <!--Flink 默认使用的是 slf4j 记录日志,相当于一个日志的接口,我们这里使用 log4j 作为 具体的日志实现--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.14.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

功能代码:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * 测试 flink cdc 实时获取oracle数据变化 * @author zyh */ public class FlinkCdcOracleExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.disableOperatorChaining(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql("CREATE TABLE student_info (\n" + " SNO INT NOT NULL,\n" + // 注意字段名要大写 " SNAME STRING,\n" + " SEX STRING,\n" + " PRIMARY KEY(SNO) NOT ENFORCED\n" + " ) WITH (\n" + " 'connector' = 'oracle-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '1521',\n" + " 'username' = 'family',\n" + " 'password' = 'zyhcdc',\n" + " 'database-name' = 'helowin',\n" + " 'schema-name' = 'FAMILY',\n" + // 注意这里要大写 " 'table-name' = 'STUDENT_INFO',\n" + " 'debezium.log.mining.continuous.mine'='true',\n"+ " 'debezium.log.mining.strategy'='online_catalog',\n" + " 'debezium.database.tablename.case.insensitive'='false',\n"+ " 'scan.startup.mode' = 'initial')"); TableResult tableResult = tableEnv.executeSql("select * from student_info"); tableResult.print(); env.execute(); } }

启动测试: 由于设置了’scan.startup.mode’ = ‘initial’,所以程序会初始化表中现有的数据。 现在对数据表执行以下操作:

-- 新增一条数据 insert into student_info (sno, sname, sex) values (28, 'zyh-test', 'm'); -- 更新数据 update student_info t set t.sname='zyh666', t.sex='m' where t.sno=26; -- 删除数据 delete from student_info where sno = 25;

程序执行结果: 自此,oracle-cdc的功能已经测试成功,将对应的结果写到mysql表中,可以使用flink sql将对应的结果写入mysql中,由于上面执行的结果属于撤回流,需要在flink sql中创建mysql sink表的时候指定主键,写入mysql的功能这里就不再贴出,小伙伴可以自己下去实现。

3、中间遇到的问题,排查解决

1、读取数据有延迟,在create语句配置以下两个选项进行解决:

'debezium.log.mining.strategy'='online_catalog', 'debezium.log.mining.continuous.mine'='true'

2、找不到表

[ERROR] Could not execute SQL statement. Reason: io.debezium.DebeziumException: Supplemental logging not configured for table FAMILY.STUDENT_INFO Use command: ALTER TABLE LIUYUN.flink ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

参看文档: https://docs.oracle.com/cd/E11882_01/server.112/e41084/sql_elements008.htm 可以在 create 语句中加上 :

'debezium.database.tablename.case.insensitive'='false' 参考资料

flink oracle cdc 官方文档

另外,除此之外,还可以使用kafka connect结合debezium的方式采集oracle数据,不过使用这种方式采集的是最原始的oracle变更日志数据,需要开发者再去进行一次日志内容的解析,增加了开发的难度以及维护成本,这种方式的具体操作步骤将会在下篇文章给出,希望感兴趣的小伙伴能够点个关注,哈哈~


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #flink同步oracle #环境准备1flink #11302oracle #exec #it