irpas技术客

springboot+debezium捕获数据库变更(mysql、sql-server、mongodb、oracle.........)_保护我方胖虎_debe

大大的周 2383

文章目录 一、什么是DebeZium二、数据库开启数据变更支持(1)MySQL开启binlog日志(2)SQL SERVER开启CDC 三、SpringBoot+debezium(1)依赖(2)配置yml配置代码读取配置 (3)逻辑代码数据处理变更数据装载类 四、测试附上源码:

一、什么是DebeZium

Debezium 是一组分布式服务,用于捕获数据库中的更改,以便您的应用程序可以看到这些更改并对其做出响应。Debezium 将每个数据库表中的所有行级更改记录在一个更改事件流中,应用程序只需读取这些流以查看更改事件发生的顺序。

Debezium 的目标是建立一个连接器库,从各种数据库管理系统捕获更改并生成具有非常相似结构的事件,使您的应用程序更容易使用和响应事件,而不管更改源自何处。

目前有以下连接器:

MongoDBMySQLPostgreSQLSQL ServerOracleDb2CassandraVitess 二、数据库开启数据变更支持 (1)MySQL开启binlog日志

MySQL 有一个二进制日志(binlog),它按照提交到数据库的顺序记录所有操作。这包括对表模式的更改以及对表中数据的更改。MySQL 使用 binlog 进行复制和恢复。

该Debezium的MySQL连接器读取二进制日志,产生变化的事件行级INSERT,UPDATE和DELETE运营,并发出更改事件卡夫卡的话题。客户端应用程序读取这些 Kafka 主题。

由于 MySQL 通常设置为在指定的时间段后清除 binlog,因此 MySQL 连接器会为您的每个数据库执行初始一致的快照。MySQL 连接器从创建快照的点读取二进制日志。

软件安装的mysql

在my.conf文件中的 [mysqld] 下添加以下三行内容

log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 读行 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

docker 安装的Mysql

# 运行mysql容器 docker run --name mysql-service -v /var/lib/mysql/data:/var/lib/mysql -v /var/lib/mysql/conf:/etc/mysql/conf.d -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=leiMysql.. -d mysql:5.7.30 --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --default-time_zone="+8:00" # 设置binlog位置 docker exec mysql-service bash -c "echo 'log-bin=/var/lib/mysql/mysql-bin' >> /etc/mysql/mysql.conf.d/mysqld.cnf" # 配置 mysql的server-id docker exec mysql-service bash -c "echo 'server-id=123' >> /etc/mysql/mysql.conf.d/mysqld.cnf" (2)SQL SERVER开启CDC

数据库版本要求: Debezium SQL Server 连接器基于SQL Server 2016 Service Pack 1 (SP1) 和更高版本的标准版或企业版中 提供的更改数据捕获功能。SQL Server 捕获进程监视指定的数据库和表,并将更改存储到专门创建的具有存储过程外观的更改表中。

捕获变更要求: 要启用 Debezium SQL Server 连接器来捕获数据库操作的更改事件记录,您必须首先在 SQL Server 数据库上启用更改数据捕获。必须在数据库和要捕获的每个表上启用 CDC。当您设置CDC在源数据库上,该连接器可以捕捉行级INSERT,UPDATE以及DELETE发生在数据库操作

软件安装SQL-SERVER

linux 环境 安装SQL-SERVER2017

Docker 安装SQL SERVER

docker pull microsoft/mssql-server-linux:2017-latest # 安装 # 说明 #-e ACCEPT_EULA = Y 设置ACCEPT_EULA变量为任何值,以确认你接受最终用户许可协议。 SQL Server 映像的必需设置。 #-e MSSQL_SA_PASSWORD =<YourStrong !Passw0rd> 指定你自己的强密码至少 8 个字符并达到SQL Server 密码要求。 SQL Server 映像的必需设置。 docker run -d -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=123456aA!' -p 1433:1433 --name sqlserver2017 -v /var/lib/mssql_data:/opt/mssql_data microsoft/mssql-server-linux:2017-latest # 进入内部 开启代理 docker exec -it sqlserver2017 "bash" # 内部执行以下命令 /opt/mssql/bin/mssql-conf set sqlagent.enabled true # 退出容器 exit # 重启容器 docker restart sqlserver2017 #------以下操作可直接在Navicat中连接到数据库后执行 # 查看指定SQL SERVER数据库是否开启CDC功能 SELECT is_cdc_enabled,CASE WHEN is_cdc_enabled=0 THEN 'CDC功能禁用' ELSE 'CDC功能启用' END 描述 FROM sys.databases WHERE NAME = 'GPS' 开启CDC功能 # 使用指定库 use gps # 执行命令 EXECUTE sys.sp_cdc_enable_db #查看当前已经开启CDC的数据表 SELECT name,is_tracked_by_cdc FROM sys.tables WHERE is_tracked_by_cdc = 1; 开启表CDC 示例: 对'dbo.vehicle'表开启变更捕获 EXEC sys.sp_cdc_enable_table @source_schema= 'dbo', --源表架构 @source_name = 'vehicle', --源表 @role_name = 'CDC_Role' --角色(将自动创建) GO --如果不想控制访问角色,则@role_name必须显式设置为null。 三、SpringBoot+debezium (1)依赖

这里依赖 Log日志存在冲突 但不影响启动,请自行排除

<properties> <java.version>1.8</java.version> <debezium.version>1.5.2.Final</debezium.version> </properties> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-api</artifactId> <version>${debezium.version}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-embedded</artifactId> <version>${debezium.version}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-sqlserver</artifactId> <version>${debezium.version}</version> </dependency> <!-- <dependency>--> <!-- <groupId>io.debezium</groupId>--> <!-- <artifactId>debezium-connector-mysql</artifactId>--> <!-- <version>${debezium.version}</version>--> <!-- </dependency>--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-core</artifactId> <version>5.4.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> (2)配置 yml配置 timely: # 是否开启 switch: true # 偏移量文件 offset-file-name: E://lei/offsets.dat # 是否启东时清除偏移量文件 offset-file-clean: true # 偏移量提交时间 单位ms offset-time: 1 # 读取历史记录文件 history-file-name: E://lei/dbhistory.dat # 读取的数据库信息 offline: ip: 10.50.100.68 port: 1433 username: sa password: 8AL16x76NP3R # 保证每个数据库读取的 instance-name logic-name 不能相同 # 实例名 instance-name: sc-mssql-connector # 逻辑名 logic-name: sc-mssql-customer # 读取的表 include-table: dbo.PfSObjDeviceBind # 读取的库 include-db: GPS 代码读取配置 package com.leilei; import cn.hutool.core.io.FileUtil; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.debezium.connector.sqlserver.SqlServerConnector; import io.debezium.embedded.Connect; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.RecordChangeEvent; import io.debezium.engine.format.ChangeEventFormat; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.kafka.connect.source.SourceRecord; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.SmartLifecycle; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.Assert; import java.util.concurrent.*; /** * @author lei * @create 2021-06-22 15:36 * @desc sql server 实时同步 **/ @Configuration @Log4j2 public class ChangeEventConfig { private final ChangeEventHandler changeEventHandler; @Value("${timely.offset-file-name}") private String offsetFileName; @Value("${timely.offset-file-clean:true}") private Boolean offsetFileDelete; @Value("${timely.offset-time}") private String offsetTime; @Value("${timely.history-file-name}") private String historyFileName; @Value("${timely.offline.instance-name}") private String instanceName; @Value("${timely.offline.logic-name}") private String logicName; @Value("${timely.offline.ip}") private String ip; @Value("${timely.offline.port}") private String port; @Value("${timely.offline.username}") private String username; @Value("${timely.offline.password}") private String password; @Value("${timely.offline.include-table}") private String includeTable; @Value("${timely.offline.include-db}") private String includeDb; @Autowired public ChangeEventConfig(ChangeEventHandler changeEventHandler) { this.changeEventHandler = changeEventHandler; if (offsetFileDelete && FileUtil.exist(offsetFileName)) { FileUtil.del(offsetFileName); } } /** * Debezium 配置. * * @return configuration */ // @Bean // io.debezium.config.Configuration debeziumConfig() { // return io.debezium.config.Configuration.create() 连接器的Java类名称 // .with("connector.class", MySqlConnector.class.getName()) 偏移量持久化,用来容错 默认值 // .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore") 偏移量持久化文件路径 默认/tmp/offsets.dat 如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更 如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。 // .with("offset.storage.file.filename", "E://spring-boot-debezium/tmp/offsets.dat") 捕获偏移量的周期 // .with("offset.flush.interval.ms", "6000") 连接器的唯一名称 // .with("name", "mysql-connector") 数据库的hostname // .with("database.hostname", "xx") 端口 // .with("database.port", "3306") 用户名 // .with("database.user", "root") 密码 // .with("database.password", "xx..") 包含的数据库列表 // .with("database.include.list", "etl") 是否包含数据库表结构层面的变更,建议使用默认值true // .with("include.schema.changes", "false") mysql.cnf 配置的 server-id // .with("database.server.id", "123") MySQL 服务器或集群的逻辑名称 // .with("database.server.name", "customer-mysql-db-server") 历史变更记录 // .with("database.history", "io.debezium.relational.history.FileDatabaseHistory") 历史变更记录存储位置,存储DDL // .with("database.history.file.filename", "E://spring-boot-debezium/tmp/dbhistory.dat") // .build(); // } @Bean io.debezium.config.Configuration debeziumConfig() { return io.debezium.config.Configuration.create() // 连接器的Java类名称 .with("connector.class", SqlServerConnector.class.getName()) // 偏移量持久化,用来容错 默认值 .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore") // 要存储偏移量的文件路径 默认/tmp/offsets.dat 如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更 // 如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。 .with("offset.storage.file.filename", offsetFileName) // 尝试提交偏移量的时间间隔。默认值为 1分钟 .with("offset.flush.interval.ms", offsetTime) // 监听连接器实例的 唯一名称 .with("name", instanceName) // SQL Server 实例的地址 .with("database.hostname", ip) // SQL Server 实例的端口号 .with("database.port", port) // SQL Server 用户的名称 .with("database.user", username) // SQL Server 用户的密码 .with("database.password", password) // 要从中捕获更改的数据库的名称 .with("database.dbname", includeDb) // 是否包含数据库表结构层面的变更 默认值true .with("include.schema.changes", "false") // Debezium 应捕获其更改的所有表的列表 .with("table.include.list", includeTable) // SQL Server 实例/集群的逻辑名称,形成命名空间,用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 架构名称以及 Avro 转换器时对应的 Avro 架构的命名空间用来 .with("database.server.name", logicName) // 负责数据库历史变更记录持久化Java 类的名称 .with("database.history", "io.debezium.relational.history.FileDatabaseHistory") // 历史变更记录存储位置,存储DDL .with("database.history.file.filename", historyFileName) .build(); } /** * 实例化sql server 实时同步服务类,执行任务 * * @param configuration * @return */ @Bean SqlServerTimelyExecutor sqlServerTimelyExecutor(io.debezium.config.Configuration configuration) { SqlServerTimelyExecutor sqlServerTimelyExecutor = new SqlServerTimelyExecutor(); DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine .create(ChangeEventFormat.of(Connect.class)) .using(configuration.asProperties()) .notifying(changeEventHandler::handlePayload) .build(); sqlServerTimelyExecutor.setDebeziumEngine(debeziumEngine); return sqlServerTimelyExecutor; } /** * @author lei * @version 1.0 * @date 2021-06-22 15:39 * @desc 同步执行服务类 */ @Data @Log4j2 public static class SqlServerTimelyExecutor implements InitializingBean, SmartLifecycle { private final ExecutorService executor = ThreadPoolEnum.INSTANCE.getInstance(); private DebeziumEngine<?> debeziumEngine; @Override public void start() { log.warn(ThreadPoolEnum.SQL_SERVER_LISTENER_POOL + "线程池开始执行 debeziumEngine 实时监听任务!"); executor.execute(debeziumEngine); } @SneakyThrows @Override public void stop() { log.warn("debeziumEngine 监听实例关闭!"); debeziumEngine.close(); Thread.sleep(2000); log.warn(ThreadPoolEnum.SQL_SERVER_LISTENER_POOL + "线程池关闭!"); executor.shutdown(); } @Override public boolean isRunning() { return false; } @Override public void afterPropertiesSet() { Assert.notNull(debeziumEngine, "DebeZiumEngine 不能为空!"); } public enum ThreadPoolEnum { /** * 实例 */ INSTANCE; public static final String SQL_SERVER_LISTENER_POOL = "sql-server-listener-pool"; /** * 线程池单例 */ private final ExecutorService es; /** * 枚举 (构造器默认为私有) */ ThreadPoolEnum() { final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(SQL_SERVER_LISTENER_POOL + "-%d").build(); es = new ThreadPoolExecutor(8, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(256), threadFactory, new ThreadPoolExecutor.DiscardPolicy()); } /** * 公有方法 * * @return ExecutorService */ public ExecutorService getInstance() { return es; } } } } (3)逻辑代码 数据处理 package cn.felord.debezium.config; import cn.hutool.core.bean.BeanUtil; import com.alibaba.fastjson.JSON; import io.debezium.data.Envelope; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.RecordChangeEvent; import lombok.Getter; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; import static io.debezium.data.Envelope.FieldName.*; import static java.util.stream.Collectors.toMap; /** * @author lei * @create 2021-06-22 16:11 * @desc 变更数据处理 **/ @Service @Log4j2 @ConditionalOnProperty(name = "timely.switch", havingValue = "true") public class ChangeEventHandler { public static final String DATA = "data"; public static final String BEFORE_DATA = "beforeData"; public static final String EVENT_TYPE = "eventType"; public static final String SOURCE = "source"; public static final String TABLE = "table"; private enum FilterJsonFieldEnum { /** * 表 */ table, /** * 库 */ db, /** * 操作时间 */ ts_ms, ; public static Boolean filterJsonField(String fieldName) { return Stream.of(values()).map(Enum::name).collect(Collectors.toSet()).contains(fieldName); } } /** * @author lei * @create 2021-06-24 16:04 * @desc 变更类型枚举 **/ public enum EventTypeEnum { /** * 增 */ CREATE(1), /** * 删 */ UPDATE(2), /** * 改 */ DELETE(3), ; @Getter private final int type; EventTypeEnum(int type) { this.type = type; } } public void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) { for (RecordChangeEvent<SourceRecord> r : recordChangeEvents) { SourceRecord sourceRecord = r.record(); Struct sourceRecordChangeValue = (Struct) sourceRecord.value(); if (sourceRecordChangeValue == null) { continue; } // 获取变更表数据 Map<String, Object> changeMap = getChangeTableInfo(sourceRecordChangeValue); if (CollectionUtils.isEmpty(changeMap)) { continue; } ChangeListenerModel changeListenerModel = getChangeDataInfo(sourceRecordChangeValue, changeMap); if (changeListenerModel == null) { continue; } String jsonString = JSON.toJSONString(changeListenerModel); log.info("发送变更数据:{}", jsonString); } try { recordCommitter.markBatchFinished(); } catch (InterruptedException e) { e.printStackTrace(); } } private ChangeListenerModel getChangeDataInfo(Struct sourceRecordChangeValue, Map<String, Object> changeMap) { // 操作类型过滤,只处理增删改 Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION)); if (operation != Envelope.Operation.READ) { Integer eventType = null; Map<String, Object> result = new HashMap<>(4); if (operation == Envelope.Operation.CREATE) { eventType = EventTypeEnum.CREATE.getType(); result.put(DATA, getChangeData(sourceRecordChangeValue, AFTER)); result.put(BEFORE_DATA, null); } // 修改需要特殊处理,拿到前后的数据 if (operation == Envelope.Operation.UPDATE) { if (!changeMap.containsKey(TABLE)) { return null; } eventType = EventTypeEnum.UPDATE.getType(); String currentTableName = String.valueOf(changeMap.get(TABLE).toString()); // 忽略非重要属性变更 Map<String, String> resultMap = filterChangeData(sourceRecordChangeValue, currentTableName); if (CollectionUtils.isEmpty(resultMap)) { return null; } result.put(DATA, resultMap.get(AFTER)); result.put(BEFORE_DATA, resultMap.get(BEFORE)); } if (operation == Envelope.Operation.DELETE) { eventType = EventTypeEnum.DELETE.getType(); result.put(DATA, getChangeData(sourceRecordChangeValue, BEFORE)); result.put(BEFORE_DATA, getChangeData(sourceRecordChangeValue, BEFORE)); } result.put(EVENT_TYPE, eventType); result.putAll(changeMap); return BeanUtil.copyProperties(result, ChangeListenerModel.class); } return null; } /** * 过滤非重要变更数据 * * @param sourceRecordChangeValue * @param currentTableName * @return */ private Map<String, String> filterChangeData(Struct sourceRecordChangeValue, String currentTableName) { Map<String, String> resultMap = new HashMap<>(4); Map<String, Object> afterMap = getChangeDataMap(sourceRecordChangeValue, AFTER); Map<String, Object> beforeMap = getChangeDataMap(sourceRecordChangeValue, BEFORE); //todo 根据表过滤字段 resultMap.put(AFTER, JSON.toJSONString(afterMap)); resultMap.put(BEFORE, JSON.toJSONString(beforeMap)); return resultMap; } /** * 校验是否仅仅是非重要字段属性变更 * @param currentTableName * @param afterMap * @param beforeMap * @param filterColumnList * @return */ private boolean checkNonEssentialData(String currentTableName, Map<String, Object> afterMap, Map<String, Object> beforeMap, List<String> filterColumnList) { Map<String, Boolean> filterMap = new HashMap<>(16); for (String key : afterMap.keySet()) { Object afterValue = afterMap.get(key); Object beforeValue = beforeMap.get(key); filterMap.put(key, !Objects.equals(beforeValue, afterValue)); } filterColumnList.parallelStream().forEach(filterMap::remove); if (filterMap.values().stream().noneMatch(x -> x)) { log.info("表:{}无核心资料变更,忽略此次操作!", currentTableName); return true; } return false; } public String getChangeData(Struct sourceRecordChangeValue, String record) { Map<String, Object> changeDataMap = getChangeDataMap(sourceRecordChangeValue, record); if (CollectionUtils.isEmpty(changeDataMap)) { return null; } return JSON.toJSONString(changeDataMap); } public Map<String, Object> getChangeDataMap(Struct sourceRecordChangeValue, String record) { Struct struct = (Struct) sourceRecordChangeValue.get(record); // 将变更的行封装为Map Map<String, Object> changeData = struct.schema().fields().stream() .map(Field::name) .filter(fieldName -> struct.get(fieldName) != null) .map(fieldName -> Pair.of(fieldName, struct.get(fieldName))) .collect(toMap(Pair::getKey, Pair::getValue)); if (CollectionUtils.isEmpty(changeData)) { return null; } return changeData; } private Map<String, Object> getChangeTableInfo(Struct sourceRecordChangeValue) { Struct struct = (Struct) sourceRecordChangeValue.get(SOURCE); Map<String, Object> map = struct.schema().fields().stream() .map(Field::name) .filter(fieldName -> struct.get(fieldName) != null && FilterJsonFieldEnum.filterJsonField(fieldName)) .map(fieldName -> Pair.of(fieldName, struct.get(fieldName))) .collect(toMap(Pair::getKey, Pair::getValue)); if (map.containsKey(FilterJsonFieldEnum.ts_ms.name())) { map.put("changeTime", map.get(FilterJsonFieldEnum.ts_ms.name())); map.remove(FilterJsonFieldEnum.ts_ms.name()); } return map; } } 变更数据装载类 package com.leilei; import lombok.Data; /** * @author lei * @create 2021-06-23 09:58 * @desc sqlServer数据变更model **/ @Data public class ChangeListenerModel { /** * 当前DB */ private String db; /** * 当前表 */ private String table; /** * 操作类型 1add 2update 3 delete */ private Integer eventType; /** * 操作时间 */ private Long changeTime; /** * 现数据 */ private String data; /** * 之前数据 */ private String beforeData; } 四、测试

由于Mysql binlog机制的存在,监听特别简单,我这里就用SQL-SERVER 来模拟,因为 SQL-SERVER 开启CDC 较为复杂一点

查看当前已经开启CDC的数据表

SELECT name,is_tracked_by_cdc FROM sys.tables WHERE is_tracked_by_cdc = 1;

为dbo.vehicle表填充数据

CDC操作属性介绍:

变化数据经过我逻辑代码处理,已经成功的变化为了 变更对象装载类JSON字符串了,我们根据里边内容可进行我们自己的业务逻辑代码编写

测试完毕,各位老爷根据自己逻辑修改修改配置连接,即可快速放心食用。

附上源码:

springboot-debezium


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #debezium #springboot