irpas技术客

SeaTunnel 在 oppo 的特征平台实践 | ETL 平台数据处理集成_Apache SeaTunnel

未知 4642

今天的分享包含以下几点:

背景&需求

为什么是 SeaTunnel

ETL 平台集成实践

作者简介?

01业务背景和需求痛点

业务背景

推搜广场景下存在大量的数据同步和特征处理需求。推荐搜索广告业务涉及图中几个模块,以特征为基础的特征服务,上层支持了机器学习、召回引擎和预估引擎。召回引擎和预估引擎支撑着更上层的推荐引擎业务的召回、粗排、精排、重排,最终产出结果。这是推搜广的主要业务流程,其中有些细小差别,但大体相似。

对于推荐系统,物料数据是推荐系统要推荐的内容,包括视频、文章或商品等。推荐系统的主要数据包括用户行为日志、服务端日志、物料数据、实时特征快照等数据,我们首先会接入 kafka 中,分两个流,一是同步到hdfs作为离线数据支持离线用户画像、物品画像、离线行为特征等离线特征数据的计算;二是 Kafka 中的数据经过实时的Flink或 Storm 处理,进行特征正负样本拼接、日志拼接和特征计算等,生成实时用户画像、物料动态画像、用户序列特征、实时快照特征等实时流特征数据。实时和离线特征通过特征注册存储到 redis、mongodb、parker、cassandra 等存储中通过特征服务对接到上层应用。

当用户向推荐系统发起一个请求时,首先触发推荐系统召回。召回有多种类型,协同类召回是基于物物相似 itemcf,人人相似 usercf,人物矩阵分解等;向量化是把一个内容或者物品通过向量化embedding 的方式表达出来,再计算相似度;池子召回,是热点池和精品池或者运营池等进行推荐;模型召回是基于一些模型算法挖掘出来的、对用户推荐的候选集数据,进入召回阶段。

召回阶段可能存在 5000 篇视频或文章,这些数据进入粗排。粗排是对召回的数据通过预估引擎进行一次粗粒度的物料筛选,筛选出 5000 中可能的1000 篇。预估引擎利用了机器学习的一些模型,进行预估和打分。打分后会进行排序。进入精排后会输入更多特征数据,包括交叉特征等,进行更细粒度的筛选、排序和打分,前面的 1000 篇可能会剩下 50 篇或更少。这个结果会进一步进行重排,重排有多重手段,像将一些内容必须插入到某个位置的调整,还包括同类文章数据按规则打散减少同质化内容、提升用户体验,也会对推荐内容做去重复等操作。完成重排后输出结果。

可以看到机器学习、召回引擎以及预估引擎都是以数据和特征为基础的,这些业务场景下有大量数据处理。数据处理主要是特征计算,而计算过程中也需要将产生的数据模型同步到对应存储中,这就是我们业务场景中数据同步需求的来源。整套系统支持了 10-20 个业务,整体数据同步的需求较大。

痛点和目标

业务多,任务碎片化。一些任务部署在调度系统中,一些任务是以 Crontab 形式配置的,开发人员维护同步任务困难,且没有上线前后的串联关系。数据同步和数据处理需求量大,人力有限,同步任务开发和部署零散,有 Spark、Flink 任务也有脚本,开发人员为了维护多个同步任务,同时还需要熟悉打包、编译、上线流程,维护流程难以统一化。且数据同步任务和数据处理存在烟囱式开发的问题,难以通用化,消耗人力物力。

我们需要让数据处理和同步任务标准化、对处理和同步任务进行统一管理,希望能将数据处理和同步抽象成工具化的产品,让数据处理和同步的能力通用化,可被复用。同时让数据处理和同步工具可以有普适性,能够产出一些低学习成本、高开发效率的工具达到减少重复劳动、提升效率的效果。

流程统一

为了解决痛点、达到目标,我们首先进行了数据处理和同步任务开发部署流程的统一。这里以样本拼接为例,样本拼接是我们业务中重要的一环,分为离线和近线。样本拼接主要指取得用户当时的一些特征快照数据,给予用户对这个推荐结果的一个正负反馈,如是否点击、是否曝光、是否下载,把这些数据作为样本输入到训练模型的样本中。我们的样本拼接主要做正负样本。

离线样本拼接首先经过 Spark 完成样本拼接和特征抽取后,结果存储到 HDFS,对接离线模型训练完成离线处理。近线样本拼接通过Flink对实时日志流数据进行处理,完成样本拼接和特征抽取后放入 Kafka,最终对接增量训练模型后完成近线处理。这里的实现是两套代码,接口和 API 不完全相同,由不同人维护,维护成本高。两套系统,分别存储的数据容易出问题,离线近线两套系统数据容易出现不一致问题,对最终模型训练和实验效果有一定影响。

在此基础上,我们统一了处理流程,实时(近线)和离线均用Flink处理数据,维护同一套引擎代码。通过 Flink 进行实时流样本拼接、特征抽取,得到的样本数据存储到 Iceberg 数据湖。Iceberg 对接离线和增量模型训练,进行数据处理。这套方案统一存储、减少数据冗余,避免了特征不一致问题发生。使用一套计算和存储引擎,函数复用,提升了效率。

结构统一

首先我们做了样本结构化。我们把输入到模型的前置特征数据基于不同类型做了分解。

图中第一部分是业务单元,业务单元主要指用户ID、物料 ID、时间戳,这些是用户请求后的快照数据。第二部实时特征,是用户请求的那一时刻的状态,比如那一时刻对某个兴趣的上下文的那个实时特征。此外还有一些离线特征。

分解前,业务单元呀、实时特征呀、离线特征都是统一通过引擎去输入,dump 到 Kafka,然后再去做特征的样本拼接和数据处理的。但离线特征这一部分的特征很多是静态的,不会频繁变更。如果每次都走流式计算这些离线特征,数据量会特别大。而且数据重复传输,可能一些数据在模型里面根本不会用到。我们做分解后,实时特征实时地请求,离线特征进行填补,减少了数据冗余,整个样本的数据也更结构化。

除样本的结构化外,我们对于样本内的特征也做了标准化,即存储格式的标准化,底层存储用pb格式序列化。数据中的 byte、string、Int64List、FeatureValueList 特征数据等。经过统一后,数据可以跨业务地复用、屏蔽底层细节。

功能模块化

在结构统一的基础上,我们进一步做了特征的生产-存储-服务全流程标准化。并统一了API、对接上下游屏蔽底层细节,让跨业务特征数据共享挖掘更大价值。

我们的特征中心对接了实时和离线数据源,进行特征生产后,将特征注册到特征存储,然后由特征服务统一对接预估模型服务或用作召回数据。

我们的特征服务支持在线计算,在线计算主要支持用户行为序列,近期特征和一些画像特征中需要统计一些基础数据的场景。如最近 20 条的某个类目的峰值、占比、CTR,可以通过在线的方式实时计算,这样能够增加业务的灵活性。

在这个结构上,我们的特征生产是基于 SeaTunnel 做的。

02 为什么用 SeaTunnel

SeaTunnel 构建在 Spark 和 Flink 上,借助Flink、Spark 能够满足大数据量实时离线, 高性能的同步和处理能力。用户不需要关注细节,通过配置化、插件的方式,配合 SQL,就可以快速部署数据同步应用到生产中。

SeaTunnel 处理流程高度抽象,逻辑清晰。SeaTunnel 对接 Hadoop、Kafka、ES、Clickhouse 等数据源,经过 Source 输入,Source 对接 Transform,Transform 中进行数据逻辑处理,包括数据过滤等。数据处理完成后对接Sink 将数据输出到目标数据源中。

图中是一个 SeaTunne 任务的开发配置,env 部分配置可配置任务的并行度和任务的优化参数、checkpoint 路径和频率等。Source 部分可以配置 FakeSourceStream 数据源,做测试使用。FakeSourceStream 可以配置数据表和字段名称。Transform 部分从 source 读取数据,可配置多个数据处理的 SQL 和临时表名。这里配置的Sink 是 ConsoleSink,是输出到控制台,方便观察和调试。

SeaTunnel 基于 java SPI 技术,非常便于扩展。下图是整个顶层接口的设计,实线表示了继承关系,白色的虚线是依赖关系。顶层是基于 Plugin,在 Plugin 的基础上涉及了BaseSource、BaseSink 和 BaseTransform。下面还有 BaseFlinkSource、BaseFlinkSink 和BaseFlinkTransform 三个高度抽象的处理流程。继承这些接口,实现自己的逻辑即可。

除了 Source、Sink、Transform 外还有Runtime,Runtime 是封装了整个运行环境。Flink 封装了 BatchEnv、StreamEnv。除此之外还有 Execution,Execution 串联了整个 Job 的执行流程。

整体结构清晰明了,插件实现起来很容易。

SeaTunnel 已经支持多种数据源,在此基础上减少了造轮子的情况发生。SeaTunnel 社区已经支持了 Doris、Redis、MongoDB、Hive、MySQL、TiDB、ElasticSearch、Clickhouse 等数据源。

03 ETL平台集成

ETL 特征生产处理平台是基于 SeaTunnel 进行的二次开发,构建在 flink 之上。

ETL 集成平台同 SeaTunnel 一样插件化,非常便于扩展与集成;

SeaTunnel 已经支持了好多数据源,不需要从头开始造轮子;

ETL 平台提供了配置化的方式,便于上手,用户不需要编写代码和了解底层细节和 API,就可以完成一个流任务或批任务的开发;

借助 Flink 的状态机制和实时处理的特性,非常适合窗口统计类实时运算的操作,非常切合推荐业务场景;

图中是 ETL 特征生产处理平台的架构,有相对独立的监控管理模块,Flink 引擎之上有数据输入层、数据处理层和数据输出层。

监控包括配置管理和任务管理,质量监控是负责配置质质量监控、告警。任务编排针对离线任务的依赖关系的管理。元数据是管理如Kafka输入的数据源的信息。此外还有血缘依赖。数据输入层接入了Hdfs、MQ、Kafka、HBase 等数据源。数据处理层可以对数据进行标准化处理,支持 SQL、DataSet/DataStream、数据格式转换和数据压缩等处理。数据输出层支持了 Scylladb、Hdfs、Redis、Kafka 等。平台的应用场景包含数据同步、数据处理、特征计算、样本拼接,都是基于底层的模块系统。

下面会详细介绍 ETL 的任务是怎么跑起来,又是怎么开发的。

组件单元分为三块:plugin、SQL 和 UDF。Plugin 是 Source、Transformer 插件,SQL 可做逻辑处理,我们也封装了很多具有通用性的UDF,和根据业务场景定制化的 UDF。结合几个模块即可生成一个 Job 的配置文件。Job 既可以是批任务也可以是流处理。整个 Job 的配置文件生成后会进行任务的管理编排,调度系统 oflow 负责编排批任务、ostream 负责编排流处理任务。oflow 会管理任务的上下游依赖关系,重试次数、报警等。ostream 是 Flink 运行的环境,管理任务参数配置等。对任务进行编排后会对任务进行提交,支持编排任务在 Yarn 和 k8s 平台进行运行。

ETL平台Job开发流程是怎样的?

首先我们采用了插件参数化管理的模式。以 Kafka Source为例,如下图所示,可以配置KafkaTableSchema 的实例名、schema、消费者信息和字段信息等。

建好 Source 后,对于不了解的新手,可以通过平台提供的托拉拽方式完成一个任务的编辑。同时借助 DAG 图的能力,能够非常容易地理解整个作业的流程。\

对整个流程比较熟悉的用户则可以直接编辑配置,或者通过复制编辑、修改部分配置,就可以快速完成一个 job 构建。

配置与插件互通,用户可以根据情况灵活选择和编辑。DAG 图可以辅助用户检查是否存在依赖错误的问题。

如何确保一个任务正确并且稳定的开发和上线运行呢?我们通过三个环节进行把控:配置检验,检查是否配置中已存在错误;逻辑调试,判断是否存在逻辑错误,如 SQL 中的字段错误或命名问题,都可以在这一步被发现;线上监控是在任务上线后保障任务稳定运行。

如何将错误控制在提交之前?首先我们会对配置进行校验。保证组成配置是合理的,SQL 是没有语法的错误的。如果肉眼观察能难发现错误,我们会对复杂 SQL 通过 SQL 解析快速定位到具体的编写错误。

第二是逻辑校验,对任务进行调试。任务调试需要数据,我们提供了两种方式采集数据,一种是线上采样,另一种是样例数据上传。线上采样有时过于随机,不满足需求,这时用户可以上传样例数据造出需要的样本,观察下游数据产出是否通过测试。

逻辑校验对全流程进行模拟,打印日志,将逻辑错误预先排除。启动调试流程后,经过样本上传或线上采样获得测试数据,进入模拟流程。最后输出结果。调试任务启动后,会对原有的任务配置做更改,在任务中插入埋点、将中间表数据处理进行输出。

图中是逻辑校验过程中打印的日志信息。可以直观地在任务上线前看到错误,防止任务不经过测试就上线污染线上数据。

除了任务上线前校验,我们还对任务进行了监控。任务监控包括 Flink metric 和自定义的指标上报,数据存储到时序数据库中,对接规则告警系统。图中展示了 Grafana 的监控项。任务上线后,基于监控大盘可以非常方便快速地定位和排查问题。

下图是任务监控中作业概况的 Grafana 大盘,可以看到任务运行情况、重启次数和延迟等信息。

业务指标监控会对指标进行收集,服务暴露metrics 函数和 sink 插件上报到时序数据库。配合规则告警系统对数据报警。

ETL 平台支持了数据同步、样本拼接、时序特征、物料和用户画像、窗口统计等场景。这些基础上,我们提供了模板,为数据同步、时序特征等较为固定和可以通用配置的场景提供了模板。我们基于模板功能可以把通用的能力沉淀下来。当用户需要新增数据同步任务时,可以基于模板对主要参数进行修改,快速创建任务完成数据同步任务的开发。

我们的 ETL 平台对 SeaTunne l也进行了优化与改进。我们对所有插件都加入了并发控制,例如一个 Kafka 有 20 个 partition,但下游数据处理逻辑比较复杂,需要 40 并发处理数据。此时如果在代码中统一配置了 40 并发,这些并发可能产生很多大而无用的开销。还可以控制写入 HDFS 的文件数量和文件大小等参数。此外我们对 Sink,Source 插件支持了更多扩展,支持了很多内部数据源。我们的配置支持参数动态配置,这个功能在进行数据回溯时非常有用。我们还开发了大量聚合函数和窗口函数,对状态保存和 Sum、Distinct 等聚合函数进行了优化。有一些函数是定制化的,有的业务有统计一些用户最近实时行为特征序列的需求,统计最近 APP 下载的一个窗口最新的 20条,且同时对 20 条信息去重,同时还要支持分类计数等。这样类场景,如果用 SQL 实现会非常复杂,还需要进行复杂的优化和调优,此时我们会开发比较通用、在性能上经过优化的函数交给用户。

ETL平台规模方面,当前上线的任务量在1400多个,日均处理条目数在100亿以上,日均数据量40T以上。

我们未来规划是引进Alink,Flink Ml等机器学习框架,支持回归以及分类算法、特征工程、窗口计算等与业务契合的能力。还有一个原因的话,就是我们通过这个引进机器学习框架,可以把现在在Spark上运行的任务也迁移到Flink上做统一管控。其次,我们会在批流一体化落地方面持续探索,解决Flink处理批任务的性能问题和起调机制问题。流批一体降低了维护成本,防止数据不一致问题出现或数据丢失,同时让数据回溯更加稳定便捷。此外,我们还计划支持纯SQL模块,Spark SQL和其他SQL迁移来的用户,面对平台中的配置文件的严格的形式会认为使用不够方便,因此我们将对纯SQL进行更好的支持。

- END

Apache SeaTunnel(Incubating)?是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

仓库地址:?

https://github.com/apache/incubator-seatunnel

网址:

https://seatunnel.apache.org/

Proposal:

https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal

Apache SeaTunnel(Incubating) 2.1.0 下载地址:

https://seatunnel.apache.org/download

衷心欢迎更多人加入!

能够进入 Apache 孵化器,SeaTunnel(原 Waterdrop) 新的路程才刚刚开始,但社区的发展壮大需要更多人的加入。我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:

https://github.com/apache/incubator-seatunnel/issues

贡献代码:

https://github.com/apache/incubator-seatunnel/pulls

订阅社区开发邮件列表 :?

dev-subscribe@seatunnel.apache.org

开发邮件列表:

dev@seatunnel.apache.org

加入 Slack:

https://join.slack.com/t/apacheseatunnel/shared_invite/zt-10u1eujlc-g4E~ppbinD0oKpGeoo_dAw

关注 Twitter:?

https://twitter.com/ASFSeaTunnel


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #SeaTunnel # #OPPO #的特征平台实践 #ETL #平台数据处理集成