irpas技术客

基于Flink+Druid的实时数仓开发(一)_weixin_46044822_实时数仓开发

网络投稿 7735

@基于Flink+Druid的实时数仓开发

Canal部署 简介 基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务,基于日志增量订阅和消费的业务包括 数据库镜像数据库实时备份索引构建和实时维护(拆分异构索引、倒排索引等)业务 cache 刷新带业务逻辑的增量数据处理 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.xgithub地址:https://github.com/alibaba/canal 环境部署 MySQL MySQL需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,/etc/my.cnf 中配置如下 `[mysqld]` `log-bin=mysql-bin # 开启 binlog` `binlog-format=ROW # 选择 ROW 模式` `server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复` Canal安装

环境要求:

安装好ZooKeeper 解压缩 mkdir /export/servers/canal tar -zxvf canal.deployer-1.1.15.tar.gz -C /usr/local/canal/

解压完成后,进入 /usr/local/canal/ 目录,可以看到如下结构

bin conf lib logs plugin

canal server的conf下有几个配置文件

[root@localhost canal]# tree conf/ conf/ ├── canal.properties ├── example │ └── instance.properties ├── logback.xml └── spring ├── default-instance.xml ├── file-instance.xml ├── group-instance.xml ├── local-instance.xml └── memory-instance.xml

canal.properties的common属性前四个配置项:

canal.id= 1 canal.ip= canal.port= 11111 canal.zkServers=

canal.id是canal的编号,在集群环境下,不同canal的id不同,要和mysql的server_id不同。

ip这里不指定,默认为本机,端口号是11111。zookeeper用于canal cluster。

canal.properties下destinations相关的配置:

################################################# ######### destinations ############# ################################################# canal.destinations = example canal.conf.dir = ../conf canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.spring.xml = classpath:spring/file-instance.xml

这里的canal.destinations = example可以设置多个,比如example1,example2, 则需要创建对应的两个文件夹,并且每个文件夹下都有一个instance.properties文件。

修改instance 配置文件

`vi conf/example/instance.properties` ## mysql serverId,这里的slaveId不能和myql集群中已有的server_id一样 canal.instance.mysql.slaveId = 1234 # 修改数据库信息 ################################################# ... canal.instance.master.address=172.16.170.***:3306 # username/password,数据库的用户名和密码 ... canal.instance.dbUsername = canal canal.instance.dbPassword = canal #################################################

启动

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log 2022-05-04 16:06:19.006 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2022-05-04 16:06:19.084 [destination = example , address = /172.16.170.***:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2022-05-04 16:06:24.114 [destination = example , address = /172.16.170.***:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position {"identity":{"slaveId":-1,"sourceAddress":{"address":"172.16.170.***","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":12814,"serverId":1,"timestamp":1651581041000}} 2022-05-04 16:06:24.437 [destination = example , address = /172.16.170.***:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=12814,serverId=1,gtid=,timestamp=1651581041000] cost : 5345ms , the next step is binlog dump Canal客户端开发 创建client_demo项目 Maven依赖 <dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.24</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency> </dependencies> 开发步骤 创建Connector连接Cannal服务器,并订阅解析Canal消息,并打印

参考代码:

public class CanalClientEntrance { public static void main(String[] args) { // 1. 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "canal", "canal"); // 指定一次性获取数据的条数 int batchSize = 5 * 1024; boolean running = true; try { while(running) { // 2. 建立连接 connector.connect(); // 回滚上次的get请求,重新获取数据 connector.rollback(); // 订阅匹配日志 connector.subscribe("itcast_shop.*"); while(running) { // 批量拉取binlog日志,一次性获取多条数据 Message message = connector.getWithoutAck(batchSize); // 获取batchId long batchId = message.getId(); // 获取binlog数据的条数 int size = message.getEntries().size(); if(batchId == -1 || size == 0) { } else { printSummary(message); } // 确认指定的batchId已经消费成功 connector.ack(batchId); } } } finally { // 断开连接 connector.disconnect(); } } private static void printSummary(Message message) { // 遍历整个batch中的每个binlog实体 for (CanalEntry.Entry entry : message.getEntries()) { // 事务开始 if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } // 获取binlog文件名 String logfileName = entry.getHeader().getLogfileName(); // 获取logfile的偏移量 long logfileOffset = entry.getHeader().getLogfileOffset(); // 获取sql语句执行时间戳 long executeTime = entry.getHeader().getExecuteTime(); // 获取数据库名 String schemaName = entry.getHeader().getSchemaName(); // 获取表名 String tableName = entry.getHeader().getTableName(); // 获取事件类型 insert/update/delete String eventTypeName = entry.getHeader().getEventType().toString().toLowerCase(); System.out.println("logfileName" + ":" + logfileName); System.out.println("logfileOffset" + ":" + logfileOffset); System.out.println("executeTime" + ":" + executeTime); System.out.println("schemaName" + ":" + schemaName); System.out.println("tableName" + ":" + tableName); System.out.println("eventTypeName" + ":" + eventTypeName); CanalEntry.RowChange rowChange = null; try { // 获取存储数据,并将二进制字节数据解析为RowChange实体 rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } // 迭代每一条变更数据 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { // 判断是否为删除事件 if(entry.getHeader().getEventType() == CanalEntry.EventType.DELETE) { System.out.println("---delete---"); printColumnList(rowData.getBeforeColumnsList()); System.out.println("---"); } // 判断是否为更新事件 else if(entry.getHeader().getEventType() == CanalEntry.EventType.UPDATE) { System.out.println("---update---"); printColumnList(rowData.getBeforeColumnsList()); System.out.println("---"); printColumnList(rowData.getAfterColumnsList()); } // 判断是否为插入事件 else if(entry.getHeader().getEventType() == CanalEntry.EventType.INSERT) { System.out.println("---insert---"); printColumnList(rowData.getAfterColumnsList()); System.out.println("---"); } } } } // 打印所有列名和列值 private static void printColumnList(List<CanalEntry.Column> columnList) { for (CanalEntry.Column column : columnList) { System.out.println(column.getName() + "\t" + column.getValue()); } } }

在mysql插入数据测试:


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #实时数仓开发 #MySQL #trigger触发器 #获取增量变更从 #2010