irpas技术客

Flink CDC踩坑集合_百里长庭

网络投稿 7292

背景

Flink版本-1.11.0 Flink-CDC版本- 1.1.0

问题集合 1. 使用flink sql 时,需要引入flink-json依赖

异常信息

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath. at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:538) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:413) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactoryBase.createDynamicTableSource(KafkaDynamicTableFactoryBase.java:73) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)

解决方案: pom文件中引入

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.11.0</version> </dependency> 2. Flink 1.11版本后简化了 connector 参数

Flink由于发展太快(2020年已经发布了1.10,1.11,1.12三个大版本),很多2020年初的blog提供教程已经面临失效

以 Kafka 为例,在 1.11 版本之前用户的 DDL 需要声明成如下方式

CREATE TABLE user_behavior ( ... ) WITH ( 'connector.type'='kafka', 'connector.version'='universal', 'connector.topic'='user_behavior', 'connector.startup-mode'='earliest-offset', 'connector.properties.zookeeper.connect'='localhost:2181', 'connector.properties.bootstrap.servers'='localhost:9092', 'format.type'='json' );

而在 Flink SQL 1.11 中则简化为

CREATE TABLE user_behavior ( ... ) WITH ( 'connector'='kafka', 'topic'='user_behavior', 'scan.startup.mode'='earliest-offset', 'properties.zookeeper.connect'='localhost:2181', 'properties.bootstrap.servers'='localhost:9092', 'format'='json' );

详细变更见FLIP-122

3. flink-sql-connector-jdbc声明表时,必须指定主键

否则会报异常:

java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink.validatePrimaryKey(JdbcDynamicTableSink.java:72) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink.getChangelogMode(JdbcDynamicTableSink.java:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:120) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:50) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:39) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) ··· 4. 使用flink-mysql-cdc时,请注意检查线上数据库binlog-format属性,另外要给用户授权 SET GLOBAL binlog_format = 'ROW'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password'; 5. 接收到MIXED或STATEMENT格式日志退出

虽然你可能将Mysql的binlog日志格式改为row,但是仍然可能存在之前的session或者有用户手动修改并提交mixed或者statement格式的日志,这会导致cdc组件异常并退出。

flink-mysql-cdc并没有直接关于此情况设置,但是其引用的debezium组件,在1.3版本(虽然官方文档在1.2版本也有相关属性,但是看其源码并不支持)开始支持忽略解析错误的语句。 可以通过添加属性配置,来跳过。但是也可能带来丢失数据的风险。

debezium文档

'debezium.event.processing.failure.handling.mode' = 'skip', 'debezium.inconsistent.schema.handling.mode' = 'skip', 'debezium.database.history.skip.unparseable.ddl' = 'true' 6. 目标表要注意清除外键依赖

同步数据时,很多公司都会直接同步原始表的所有字段作为数仓ods层或者dim层,并使用mysql存储,不做任何处理,只有在流表与维表join的时候才会读取.

此时可能从业务数据库导出sql并导入数仓的mysql,外键依赖也可能会导入。

那么需要注意去掉其外键依赖,否则会在同步时发生异常。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #CDC踩坑集合 #110问题集合1 #使用flink #SQL #by