实时计算 | 2021年网易云音乐实时计算平台发展和挑战
网易云音乐实时数仓平台上线以后,经过一年半的发展,整体实时数仓已经初具规模,我们已有实时数仓表300+,运行中的任务数有1200+。其中1000左右的任务是SQL任务, Kafka总出口流量达到到18GB/S,总用户数达到了200+。
数据量和用户的增长也给数据平台的易用性以及稳定性带来了了越来越多的挑战,包含Kafka的稳定性、集群的稳定性、运维工作的挑战以及很多早期的技术债;业务的增长,暴露出了基建的薄弱,也给我们积累了很多平台建设和运维的经验。
我们平台整体的的功能大家可以参考《云音乐实时数仓技术改造以及未来的一些规划》,这里将主要介绍我们最新的一些工作:
“我的任务延迟了,怎么扩容都不行,这是为什么?”
在日常运维工作中这是我们经常遇到的问题,往往也是比较耗费时间的问题。导致这种这种问题的原因有很多,为了解决这个问题,我们做了一些工作来增强我们的运维能力。
IO指标完善
IO问题是导致以上问题经常出现的原因之一,包含消息读取效率、维表JOIN效率、SINK效率等等,第三方存储的性能以及稳定性,直接影响实时任务的稳定性,为了快速定位相关问题,我们添加了很多IO相关Metric指标。
1. Kafka消费侧的一些性能指标
2. 读取反序列化指标
包含:
反序列化的RT
反序列化的错误比例
在Format侧我们开发了一套Format代理,支持在不修改原有format代码的情况下,上报相关metirc指标,忽略错误数据等功能。只要添加属性format.proxy指定代理类就可以支持不同方式的Format封装。
比如我们指定format.proxy=magina,就可以支持上报上述的性能指标;指定format.proxy=ds 就可以支持解析ds封装的日志格式,使用被代理的Format解析DS中的Body部分,不需要单独为DS封装的日志格式开发Format,且同样会上报性能相关指标,支持忽略错误消息等功能。
3. 维表JOIN相关指标
在维表JOIN侧, 我们添加了:
数据查询的响应时间
本地缓存的命中率
查询发生重试的比例
成功JOIN上的数据的比例等
5. 数据写入的一些性能指标
数据序列化的RT
数据写入外部数据源的平均响应时间等
整套IO相关指标的实现,我们全部是在Flink Connector的顶层接口做了一些公共的封装,重构了相关Connector的代码,只要按照我们自己的接口实现Connector,无需关心细节指标的上报,这些指标都会自动化的上报出来。
Kafka分区问题
Kafka分区的限制也是经常导致我们程序性能无法扩展的原因,出于Exactly Once的实现、读取性能、以及读取稳定性的考虑,Flink采用主动拉取的方式读取Kafka消息,这种方式限制了我们读取Kafka消息的任务数,大大限制我们任务性能的扩张能力,以下面这个case为例:
SET 'table.exec.state.ttl' = '1h';
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '10s';
SET 'table.exec.mini-batch.size' = '100000';
INSERT INTO music_kudu_online.music_kudu_internal.ads_ab_rtrs_user_metric_hour
SELECT
from_unixtime(`timestamp`, 'yyyy-MM-dd') as dt,
from_unixtime(`timestamp`, 'HH') as `hour`,
os, sceneid, parent_exp, `exp`, exp_type, userid,
count(1) pv
FROM iplay_ods.ods_rtrs_ab_log
INNER JOIN abtest_online.abtest.abtest_sence_metric_relation
FOR SYSTEM_TIME AS OF user_metric.proctime
ON ods_rtrs_ab_log.sceneid = abtest_sence_metric_relation.sceneid
GROUP BY from_unixtime(`timestamp`, 'yyyy-MM-dd'),
from_unixtime(`timestamp`, ‘HH’),
os, sceneid, parent_exp, `exp`, exp_type, userid
这是一个实时全聚合任务,在原始的FLINK中这段SQL执行的DAG大概是这样的:
假如我们读取的流表ods_rtrs_ab_log有5个分区,我们的SQL任务有七个并发,因为受到Kafka分区数的影响,加上FLINK本身作业链的优化,我们的消息的读取、维表JOIN、MINI BATCH的操作全部受到了Kafka分区的影响,无法扩展,特别是对于维表JOIN这种IO操作来说,任务的并发度严重影响了整体程序的性能,这个时候我只能通过扩容Kafka的分区数来提升性能。
但是这种操作非常重,而且很有可能会影响其它读取这张流表的任务;为了解决这个问题,我们对Kafka的Connector做了一些改造,支持通过配置多添加一步Shuffle操作,比如在上面的配置当中我们添加了配置:
'connector.rebalance.keys' = 'sceneid,parent_exp,userid'
消息会在读取以后按照sceneid,parent_exp,userid等字段进行hash分片,这样大大提高了整体程序的性能扩展性,而且通过指定字段的keyBy操作,可以大大提高维表JOIN缓存的命中率,提高MINI BATCH的性能和效率。
除了以上配置以外,我们还支持添加随机的Rebalance操作、Rescale操作以及解析行为的拆解,来进一步提升整体程序性能的扩展,这里需要注意的是额外Shuffle操作,会带来更多线程和网络开销,在配置这些操作的同时需要同时关注机器的负载情况,添加额外的Shuffle操作虽然能提升程序的扩展性,但是由于额外网络和线程开销,如果机器本身性能不行的话,很有可能会适得其反,在相同的资源情况下性能变得更差,这点需要根据自己程序以及环境情况进行配置。
Kafka使用优化
随着流量的飞速增长Kafka的稳定性也是我们面临的主要难题,包括Kafka的机柜带宽问题、跨机房带宽问题、Kafka扩缩容的抖动问题、还有Kafka本身配置问题等等,基本上大家能遇到的问题我们都遇到了,为了解决以上问题我们做了以下工作:
1. 开发镜像服务,解决带宽问题,保障高优先级任务
我们通过FLINK自己开发了一套镜像服务,在不同的机房模块间分别部署了一套Kafka集群,通过镜像服务同步两套Kafak集群的数据,主Kafka提供给比较重要P0级别的实时任务,其它不是特别重要的任务读取镜像集群的数据。
我们通过Yarn Label技术,通过不同队列的选择来控制任务所在的机房,来减少跨机房带宽的消耗,为了方便用户切换不同的Kafka集群,我们在Flink流表侧也做了一些改造,支持一张流表同时挂载多个Kafka集群,只要通过简单的配置就可以随意切换Kafka集群,经过一轮任务整理和切换,Kafka带宽使用情况有了大大的改善:
2. Kafka监控完善
在日常的工作中,我们发现很多开发对Kafka本身并不太了解,运维由于经验的不足在初期对整体Kafka的管控也不是那么的严格,导致在使用上有很多问题。所以我们整合了音乐内部的Kafka监控服务的数据,结合我们平台的任务血缘,开发了自己的一套Kafka监控服务。
目前这套系统整体还比较初级,除了关联了Kafka、流表、和任务之间的关系以外,我们还对以下这几种情况做了主动监控:
Kafka Topic的分区数的合理性,主要监控消息队列分区数过少或者过多的情况,主要是过少的情况,防止因为分区数过小,下游任务处理性能跟不上的问题;
Kafka分区数据生产均衡问题:防止因为Kafka本身分区数据的不均衡导致下游任务处理性能不行的问题;
Kafka分区数据消费均衡问题:防止因为Kafka本身分区发生变化,而下游任务因为没有开启分区感知,导致一些数据没有消费到等问题;
流量激增和激降报警:关键队列流量报警,保障实时数据的质量。
Kafka版本升级:为了解决本身Kafka扩容的稳定性问题、资源隔离问题,通过我们音乐公共技术团队,在Kafka 2.X版本基础上做了一些二次开发工作,将Kafka整个服务做了平台化的支持,支持了Topic的平滑扩所容,支持资源隔离。
类似YARN的LAEBL技术,支持针对不同的TOPIC划分不同region的机器,完善的消息镜像服务,且支持offset的复制;统一的Kafka运维监控平台,此部分内容后续文章会详细介绍。
3. 分区流表技术建设
实时数仓上线以后,我们发现以下几种情况非常影响程序的稳定性以及流表的易用性:
(1)很多时候我们只需要一张流表中1%的数据,但是因为没有办法按需读取,所以我们必须消耗大量的资源去解析读取另外99%的数据,导致了大量的资源带宽的消耗,浪费了大量的资源,而且本身SQL的开发方式本身没有办法按需解析日志,导致我们必须完整的解析出每一条消息,这就导致进一步的计算资源的消耗。
(2)当我们按照经验和业务,将大的TOPIC拆分成很多小的TOPIC时,一张表变成了很多小表,使用者又必须有很多的经验知识去了解这些schema完全相同的小表中分别包含了哪些消息,易用性很差,这样的设计也不符合数仓的整体设计逻辑,以后如果要做批流表统一元数据的时候,整体也变得不太可能
在离线场景下我们很有很多手段来解决以上问题,减少不必要的IO,如数据的分桶、存储有序的数据利用Parquet的下推查询的能力、做分区表等手段都可以解决以上问题。但是实时表的Case下在现有的公开的方案中好像并没有什么好的方法;所以为了解决以上问题,我们开发了流表的分区方案,整体和HIVE表的分区实现思想差不多:
我们使用Flink Table Souce提供的SupportsFilterPushDown的接口实现了一套自己的实时流表分区方案,一个分区对应一个topic,通过用户的查询条件下推过滤掉没有必要的分区,从而减少没有必要的数据的读取;目前已经上线了第一版,初步拆分了云音乐曝光日志,顺便还尝试使用AVRO的数据格式代替以前的JSON格式,实践下来优化效果明显:
(1)使用AVRO格式格式基本都能带来至少30+%的的带宽优化,消息解析性能相对音乐的原始日志格式的解析性能提升一倍.
(2)使用分区流表,我们初步迁移了了4个曝光日志的消费任务,已经节省了7台物理机,平均节省计算和带宽资源75%以上。
虽然这些都是比较极端的Case,但是从这些例子我们可以预计分区流表技术全面铺开以后,使用得到的话,绝对是一个能带来质变的优化。
数据实时化一直是我们云音乐数据平台团队数仓建设的一个比较大的目标,在这个目标的背后批流一体也是我们绕不开一个“名词”、“概念”、“技术”、或者是个“产品”。在正式开始分享我们的工作以前,首先分享下我有一次在电梯间遇到算法同学,然后和算法同学发生的对话:
算法:你们的批流一体什么时候上线?我们等着用呢?
我: 你们目前的诉求是什么呢?
算法:我们现在很多实时指标都是自己开发,没法在离线以后直接使用现成数仓数据。
从这段对话我们可以看出,算法同学并不是想要什么批流一体的技术,他们想要的是实时的现成的可用的数仓数据,来提升他们的开发效率,批流一体的背后,不同角色的业务方的诉求是什么呢?
对于运营、产品、老板、分析师们来说:
他们想要看到的是准确的实时的可分析的报表数据,关键点在于可分析上。当结果数据发生异常波动时,我们得有实时的明细数据提供分析查询,来调查发生异常波动的原因。当老板有一些新的想法,想对现成的报表做下二次分析时,我们得有能力提供明细的可分析的数据来做分析给出结果。
以实时日活统计来说,我们常用的手段是将用户ID存储的Redis这样KV存储当中来做去重,或者近似去重,然后计算得出实时的日活数据,但是当日活发生异常波动时,因为Reids的数据不是可分析的。所以我们很难快速给出原因,也没法在当天做分析,这种方案和结果显然是不合格的。
对于数仓开发来说:
统一实时/离线数仓元数据管理、统一模型、统一存储,减少数仓运维建设成本,提升整体数仓的易用性;
统一开发代码,统一一套SQL解决离线/实时开发问题,降低开发运维成本,彻底解决因为业务理解不同、逻辑不同导致的实时离线数据结果差异大的问题。
对于算法同学来说:
有实时/离线统一的数仓表可以可以用使用,统一模型,降低业务理解的门槛,提升整体数仓数据的易用性,方便好用的数仓元数据管理服务,方便算法同学进行二次的特征开发工作,提升模型的开发效率。提供准确实时可分析的算法模型效果数据,提升算法同学模型迭代的效率
整体总结下来批流一体的目标主要包含三个方面:
统一代码:一套SQL完成实时和离线的相关业务的开发需求;
统一数仓元数据:一张表可以同时提供离线读和实时读,统一模型的批流一体的数仓;
实时的报表数据:这与统一数仓元数据不同,产品报表数据需要提供秒级的实时的结果的查询能力,而统一数仓数据往往只需要实时的存储即可,对OLAP查询的效率,并没有报表数据并没有那么敏感。
1. 统一代码
由于实时SQL本身并没有特别的成熟,很多在离线场景下很容易实现的逻辑,在实时场景下要么是不能实现,要么是稳定性有问题。
目前业界都还在探索当中,阿里目前主要的方式的是使用FLINK一套引擎解决实时离线统一SQL的问题,但是目前也都是在实践,在上层ADS层业务逻辑实现上通过底层数仓的建设屏蔽掉一些实时SQL能力的问题,做到产品报表开发上统一一套SQL。这也是我们未来可以尝试的方向,除了在上层报表开发上尝试统一SQL以外,我们在统一代码这一块也做了一些工作和规划:
(1)统一UDF,集成升级平台框架到FLINK1.12新版本,统一离线实时统一套UDF;
(2)统一元数据管理:在FlinkSQL侧我们继承元数据中心服务,提供catalog.db.table这样的数据读取和写入方式,为了统一元数据,同样我们对SparkSQL做了二次的封装,同样和元数据中心做了集成,实现了以catalog.db.table这样形式的异构数据源之间的读取和写入。
场景化的配置式的批流一体的统一实现,对于一些简单业务逻辑的场景,我们后续会开发场景化的批流一体的实现。如批流一体的索引任务、批流一体的ETL清洗平台等等,这块由于资源问题,目前还在规划中。
批流一体SQL统一的在目前的技术下,还有一个比较大的前提是本身日志的复杂程度,这个涉及到本身日志埋点规范性和完整性,实时计算不像离线,可以将大量归因逻辑, 关联逻辑放在数据侧进行处理,抛开合理性和成本问题,很多工作在离线场景下是可以做的。
但是在实时场景,本身对性能和稳定性都非常的敏感,如果将大量的逻辑都放在数据侧进行处理,本身就会带来很多不能实现的问题、实现起来成本高的问题、很多稳定性、以及数据延迟的问题。如果打点做不好,整个实时数仓建设都是问题,所以云音乐也启动了曙光打点项目和有数团队合作,彻底重构云音乐各个产品的打点的实现,提升和完善打点的规范性和准确性,降低实时数仓的开发成本问题。
2. 统一数仓元数据
目前业界主要有两类方案:
第一种是建设批流映射层的方案,目前阿里公开的方案的就是这种方案,比较适合已经有了实时数仓和离线数仓的老产品,在不改动原有数仓的情况下,构建统一映射层视图,通过视图的方式提供一体化的使用体验,整体的原理参考下图:
第二种方案是构建一种新的元数据系统,一套schema下同时挂载多种存储,如HDFS、Kafka等,在写入数据时同时写入,在读取场景下时,根据读取方式的不同,选择相应的合适的存储,目前网易数帆有数产品团队开发的Arctic采用的就是这种方案:
整体思路是封装icberg和Kafka以及Hbase等多种存储,在不同场景下使用不同的存储,另外arctic还在iceberg的基础上做了很多二次开发,来解决DWS数据的更新问题,提供类似Hudi的CopyOnWrite以及MergeOnRead等功能,用来解决Flink本身用来做全聚合的稳定性问题。目前云音乐已经在一些新的业务场景做了试用,已经上线了几十张的的批流一体表,大家如果想进一步了解arctic可以找网易数帆有数实时计算团队了解,在此不过多描述。
3. 实时的报表数据
提供实时的报表数据主要依赖OLAP引擎和存储,存储侧需要有需要有在提供实时的数据更新能力的同时,还需要有提供秒级别数据的查询能力,很多时候没有办法把将结果直接写到到存储中。因为数据报表本身很多灵活性的查询,如果直接将结果写到存储中, 就需要类似Kylin那种实时的Cube能力,这对开发以及Flink本身计算的压力太大, 本身也会带来很多资源的和存储的浪费,稳定性问题以及开发工作量的问题也会很多,数据的二次分析能力也会很局限;所以在这一层我们需要OLAP引擎提供至少百亿级别的数据的秒级延迟的查询的能力,目前我们主要的方案采用的存储有Kudu和Clickhouse两种,以我们老版本的ABTest为例,我们采用的方案如下:
对于实时的最新的小时维度以及天维度的结果我们通过Impala及时读取Kudu数据关联出最新的结果;对于历史的一天以前天维度数据或者两个小时以前小时维度的数据我们采用Spark预计算好存储在结果表当中,两份数据UNION在一起提供给用户,保障数据结果的时效性,以及整体数据查询的用户体验。
运维工具的完善
实时SQL的发展降低了实时数据统计的开发难度,大大降低了实时数据统计的门槛,一方面由于本身实时SQL的不成熟而且黑盒,另一方面很多同学带着离线SQL的开发经验或者MYSQL类数据库的SQL经验来开发实时任务,这给平台带来了很大的运维压力,所以运维工具相关的建设,任务实时指标的完善是我们未来主要思考的方向之一。
分区流表技术完善
分区流表技术是一个能给云音乐实时平台资源使用,Kafka压力以及数仓建设带来质变的技术,目前我们只是完成了一个初版,未来我们会在分区的动态感知,分区的修改, schema的修改,以及运维监控以及推广上继续完善。
场景化批流一体建设
如批流一体索引任务建设、批流一体ETL工具等, 统一日志清洗规则, 为批流一体数仓打好基础。
批流一体存储探索
调研业界目前的方案, 结合音乐的业务场景, 提供整套解决方案, 降低实时报表的开发门槛, 提升实时报表的开发效率;
批流一体逻辑层建设等。
最后附一张网易数帆有数团队的实时计算解决方案架构图,基于 Apache Flink 构建的高性能、一站式实时大数据处理方案,广泛适用于流式数据处理场景,感兴趣的同学可以点击文末的“阅读原文”详细了解。
大愚,网易云音乐数据平台开发专家,主要负责云音乐实时、离线、机器学习开发平台建设工作。
赠书福利