
从Flink SQL "doesn't support consuming update and delete changes"

irpas 767


相信我们在初学Flink SQL时,多少遇到过像这样的错误信息:

org.apache.flink.table.api.TableException: X[算子名] doesn't support consuming update and delete changes which is produced by node Y[算子名]



笔者之前写过一篇自定义Flink SQL Connector的简明教程,其中提到在定义DynamicTableSink(以及ScanTableSource)的时候,都需要覆写getChangelogMode()方法,告诉Planner这个Connector可以接受或产生的数据变化类型。变化的标记由四种RowKind表示,即INSERT(+I)、UPDATE_BEFORE(-U)、UPDATE_AFTER(+U)和DELETE(-D):

/** Insertion operation. */ INSERT("+I", (byte) 0), /** * Update operation with the previous content of the updated row. * * <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that * needs to retract the previous row first. It is useful in cases of a non-idempotent update, * i.e., an update of a row that is not uniquely identifiable by a key. */ UPDATE_BEFORE("-U", (byte) 1), /** * Update operation with new content of the updated row. * * <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that * needs to retract the previous row first. OR it describes an idempotent update, i.e., an * update of a row that is uniquely identifiable by a key. */ UPDATE_AFTER("+U", (byte) 2), /** Deletion operation. */ DELETE("-D", (byte) 3);


ModifyKindSet / UpdateKind Trait


A set of physical properties & their definitions carried by a relational expression.

站在RelNode的角度上讲,ChangelogMode确实可以作为附加在其上的物理属性。Blink Planner的物理计划层使用了两个RelTrait来承载数据变化的语义。第一个是ModifyKindSetTrait,表示INSERT(I)、UPDATE(U)和DELETE(D)三者组成的集合,部分代码如下,比较容易理解:

object ModifyKindSetTrait { /** * An empty [[ModifyKindSetTrait]] which doesn't contain any [[ModifyKind]]. */ val EMPTY = new ModifyKindSetTrait(ModifyKindSet.newBuilder().build()) /** * Insert-only [[ModifyKindSetTrait]]. */ val INSERT_ONLY = new ModifyKindSetTrait(ModifyKindSet.INSERT_ONLY) /** * A modify [[ModifyKindSetTrait]] that contains all change operations. */ val ALL_CHANGES = new ModifyKindSetTrait(ModifyKindSet.ALL_CHANGES) /** * Creates an instance of [[ModifyKindSetTrait]] from th given [[ChangelogMode]]. */ def fromChangelogMode(changelogMode: ChangelogMode): ModifyKindSetTrait = { val builder = ModifyKindSet.newBuilder changelogMode.getContainedKinds.foreach { case RowKind.INSERT => builder.addContainedKind(ModifyKind.INSERT) case RowKind.DELETE => builder.addContainedKind(ModifyKind.DELETE) case _ => builder.addContainedKind(ModifyKind.UPDATE) // otherwise updates } new ModifyKindSetTrait( } }


object UpdateKindTrait { /** * An [[UpdateKindTrait]] that describes the node doesn't provide any kind of updates * as a provided trait, or requires nothing about kind of updates as a required trait. * * <p>It also indicates that the [[ModifyKindSetTrait]] of current node doesn't contain * [[ModifyKind#UPDATE]] operation. */ val NONE = new UpdateKindTrait(UpdateKind.NONE) /** * An [[UpdateKindTrait]] that describes the node produces update changes just as a * single row of [[org.apache.flink.types.RowKind#UPDATE_AFTER]] */ val ONLY_UPDATE_AFTER = new UpdateKindTrait(UpdateKind.ONLY_UPDATE_AFTER) /** * An [[UpdateKindTrait]] that describes the node produces update changes consists of * a row of [[org.apache.flink.types.RowKind#UPDATE_BEFORE]] and * [[org.apache.flink.types.RowKind#UPDATE_AFTER]]. */ val BEFORE_AND_AFTER = new UpdateKindTrait(UpdateKind.BEFORE_AND_AFTER) /** * Returns ONLY_UPDATE_AFTER [[UpdateKindTrait]] if there is update changes. * Otherwise, returns NONE [[UpdateKindTrait]]. */ def onlyAfterOrNone(modifyKindSet: ModifyKindSet): UpdateKindTrait = { val updateKind = if (modifyKindSet.contains(ModifyKind.UPDATE)) { UpdateKind.ONLY_UPDATE_AFTER } else { UpdateKind.NONE } new UpdateKindTrait(updateKind) } /** * Returns BEFORE_AND_AFTER [[UpdateKindTrait]] if there is update changes. * Otherwise, returns NONE [[UpdateKindTrait]]. */ def beforeAfterOrNone(modifyKindSet: ModifyKindSet): UpdateKindTrait = { val updateKind = if (modifyKindSet.contains(ModifyKind.UPDATE)) { UpdateKind.BEFORE_AND_AFTER } else { UpdateKind.NONE } new UpdateKindTrait(updateKind) } /** * Creates an instance of [[UpdateKindTrait]] from the given [[ChangelogMode]]. */ def fromChangelogMode(changelogMode: ChangelogMode): UpdateKindTrait = { val hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE) val hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER) (hasUpdateBefore, hasUpdateAfter) match { case (true, true) => BEFORE_AND_AFTER case (false, true) => ONLY_UPDATE_AFTER case (true, false) => throw new IllegalArgumentException("Unsupported changelog mode: " + ChangelogPlanUtils.stringifyChangelogMode(Some(changelogMode))) case (false, false) => NONE } } } RelTrait相容性


boolean satisfies(RelTrait trait);

它用于判断此RelTrait与另外一个RelTrait的相容性,亦即T1是否满足T2的约束。显然,如果T1与T2相同,或者T1比T2更严格,那么此方法返回true,否则返回false。举个栗子,对于RelCollation而言,(ORDER BY a, b) satisfies (ORDER BY a)就是成立的,反过来则不成立。


override def satisfies(relTrait: RelTrait): Boolean = relTrait match { case other: ModifyKindSetTrait => // it’s satisfied when modify kinds are included in the required set, // e.g. [I,U] satisfy [I,U,D] // [I,U,D] not satisfy [I,D] this.modifyKindSet.getContainedKinds.forall(other.modifyKindSet.contains) case _ => false }


override def satisfies(relTrait: RelTrait): Boolean = relTrait match { case other: UpdateKindTrait => // should totally match other.updateKind == this.updateKind case _ => false }

接下来就可以进入Blink Planner的相关逻辑了。


Blink Planner通过名为FlinkChangelogModeInferenceProgram的优化程序来为每个StreamPhysicalRel推断出ChangelogMode信息,并检查产生的ModifyKindSetTrait和UpdateKindTrait的上下游相容性。主要的逻辑分为两步:

// step1: satisfy ModifyKindSet trait val physicalRoot = root.asInstanceOf[StreamPhysicalRel] val rootWithModifyKindSet = SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR.visit( physicalRoot, // we do not propagate the ModifyKindSet requirement and requester among blocks // set default ModifyKindSet requirement and requester for root ModifyKindSetTrait.ALL_CHANGES, "ROOT") // step2: satisfy UpdateKind trait val rootModifyKindSet = getModifyKindSet(rootWithModifyKindSet) // use the required UpdateKindTrait from parent blocks val requiredUpdateKindTraits = if (rootModifyKindSet.contains(ModifyKind.UPDATE)) { if (context.isUpdateBeforeRequired) { Seq(UpdateKindTrait.BEFORE_AND_AFTER) } else { // update_before is not required, and input contains updates // try ONLY_UPDATE_AFTER first, and then BEFORE_AND_AFTER Seq(UpdateKindTrait.ONLY_UPDATE_AFTER, UpdateKindTrait.BEFORE_AND_AFTER) } } else { // there is no updates Seq(UpdateKindTrait.NONE) }


def visit( rel: StreamPhysicalRel, requiredTrait: ModifyKindSetTrait, requester: String): StreamPhysicalRel = rel match { case sink: StreamPhysicalSink => val name = s"Table sink '${sink.tableIdentifier.asSummaryString()}'" val queryModifyKindSet = deriveQueryDefaultChangelogMode(sink.getInput, name) val sinkRequiredTrait = ModifyKindSetTrait.fromChangelogMode( sink.tableSink.getChangelogMode(queryModifyKindSet)) val children = visitChildren(sink, sinkRequiredTrait, name) val sinkTrait = // ignore required trait from context, because sink is the true root sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel] case sink: StreamPhysicalLegacySink[_] => // ...... case deduplicate: StreamPhysicalDeduplicate => // deduplicate only support insert only as input val children = visitChildren(deduplicate, ModifyKindSetTrait.INSERT_ONLY) val providedTrait = if (!deduplicate.keepLastRow && !deduplicate.isRowtime) { // only proctime first row deduplicate does not produce UPDATE changes ModifyKindSetTrait.INSERT_ONLY } else { // other deduplicate produce update changes ModifyKindSetTrait.ALL_CHANGES } createNewNode(deduplicate, children, providedTrait, requiredTrait, requester) case agg: StreamPhysicalGroupAggregate => // agg support all changes in input val children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES) val inputModifyKindSet = getModifyKindSet(children.head) val builder = ModifyKindSet.newBuilder() .addContainedKind(ModifyKind.INSERT) .addContainedKind(ModifyKind.UPDATE) if (inputModifyKindSet.contains(ModifyKind.UPDATE) || inputModifyKindSet.contains(ModifyKind.DELETE)) { builder.addContainedKind(ModifyKind.DELETE) } val providedTrait = new ModifyKindSetTrait( createNewNode(agg, children, providedTrait, requiredTrait, requester) case tagg: StreamPhysicalGroupTableAggregateBase => // ...... case agg: StreamPhysicalPythonGroupAggregate => // ...... case window: StreamPhysicalGroupWindowAggregateBase => // WindowAggregate and WindowTableAggregate support insert-only in input val children = visitChildren(window, ModifyKindSetTrait.INSERT_ONLY) val builder = ModifyKindSet.newBuilder() .addContainedKind(ModifyKind.INSERT) if (window.emitStrategy.produceUpdates) { builder.addContainedKind(ModifyKind.UPDATE) } val providedTrait = new ModifyKindSetTrait( createNewNode(window, children, providedTrait, requiredTrait, requester) case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank => // WindowAggregate and WindowRank support insert-only in input val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY) val providedTrait = ModifyKindSetTrait.INSERT_ONLY createNewNode(rel, children, providedTrait, requiredTrait, requester) case limit: StreamPhysicalLimit => // ...... case _: StreamPhysicalRank | _: StreamPhysicalSortLimit => // ...... case sort: StreamPhysicalSort => // ...... case cep: StreamPhysicalMatch => // ...... case _: StreamPhysicalTemporalSort | _: StreamPhysicalIntervalJoin | _: StreamPhysicalOverAggregate | _: StreamPhysicalPythonOverAggregate => // ...... case join: StreamPhysicalJoin => // ...... case windowJoin: StreamPhysicalWindowJoin => // ...... case temporalJoin: StreamPhysicalTemporalJoin => // ...... case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin | _: StreamPhysicalExchange | _: StreamPhysicalExpand | _: StreamPhysicalMiniBatchAssigner | _: StreamPhysicalWatermarkAssigner | _: StreamPhysicalWindowTableFunction => // transparent forward requiredTrait to children val children = visitChildren(rel, requiredTrait, requester) val childrenTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE) // forward children mode createNewNode(rel, children, childrenTrait, requiredTrait, requester) case union: StreamPhysicalUnion => // ...... case normalize: StreamPhysicalChangelogNormalize => // ...... case ts: StreamPhysicalTableSourceScan => // ScanTableSource supports produces updates and deletions val providedTrait = ModifyKindSetTrait.fromChangelogMode(ts.tableSource.getChangelogMode) createNewNode(ts, List(), providedTrait, requiredTrait, requester) case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan | _: StreamPhysicalValues => // ...... case scan: StreamPhysicalIntermediateTableScan => // ...... case _ => throw new UnsupportedOperationException( s"Unsupported visit for ${rel.getClass.getSimpleName}") }



private def createNewNode( node: StreamPhysicalRel, children: List[StreamPhysicalRel], providedTrait: ModifyKindSetTrait, requiredTrait: ModifyKindSetTrait, requestedOwner: String): StreamPhysicalRel = { if (!providedTrait.satisfies(requiredTrait)) { val diff = providedTrait.modifyKindSet.minus(requiredTrait.modifyKindSet) val diffString = diff.getContainedKinds .toList.sorted // for deterministic error message .map(_.toString.toLowerCase) .mkString(" and ") // creates a new node based on the new children, to have a more correct node description // e.g. description of GroupAggregate is based on the ModifyKindSetTrait of children val tempNode = node.copy(node.getTraitSet, children).asInstanceOf[StreamPhysicalRel] val nodeString = tempNode.getRelDetailedDescription throw new TableException( s"$requestedOwner doesn't support consuming $diffString changes " + s"which is produced by node $nodeString") } val newTraitSet = node.copy(newTraitSet, children).asInstanceOf[StreamPhysicalRel] }


SELECT userId, COUNT(DISTINCT orderId) FROM ( SELECT * FROM ( SELECT *, ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime [ASC | DESC]) AS rn FROM rtdw_ods.kafka_order_done_log /*+ OPTIONS('scan.startup.mode'='latest-offset') */ ) WHERE rn = 1 ) GROUP BY userId, TUMBLE(procTime, INTERVAL '5' SECOND);

经过试验可以发现,如果去重保留第一条数据(即ORDER BY procTime ASC),那么这条语句可以正常执行。但若是保留最后一条数据(即ORDER BY procTime DESC),就会抛出如下的异常:

Exception in thread "main" org.apache.flink.table.api.TableException: StreamPhysicalGroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[LastRow], key=[suborderid], order=[PROCTIME]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:389) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:166) ......





SatisfyModifyKindSetTraitVisitor#visit()方法: 将StreamPhysicalGroupWindowAggregateBase判断分支中visitChildren方法的requiredChildrenTrait参数由ModifyKindSetTrait.INSERT_ONLY改成ModifyKindSetTrait.ALL_CHANGES,表示它接受所有变更类型。 SatisfyUpdateKindTraitVisitor#visit()方法: 将第3个判断分支的条件最后加上| _: StreamPhysicalGroupWindowAggregateBase,表示它接受UpdateKindTrait.BEFORE_AND_AFTER(对于回撤流)和UpdateKindTrait.NONE(对于只追加流)。相应地,在第4个判断分支的条件中删掉_: StreamPhysicalGroupWindowAggregate | _: StreamPhysicalGroupWindowTableAggregate两者。


The End

明后两天就是Flink Forward Asia 2021 Online咯~



标签: #Flink #SQL #update #前言相信我们在初学Flink #X算子名 #doesnt #support