原来,字节跳动的实时数据湖是这样落地的!
共 13699字,需浏览 28分钟
·
2022-09-17 21:35
导读:今天分享的主题是实时数据湖在字节跳动的实践,将围绕下面四点展开:
对实时数据湖的解读
在落地实时数据湖的过程中遇到的一些挑战和应对方式
结合场景介绍实时数据湖在字节内部的一些实践案例
数据湖发展的一些规划
数据湖的概念是比较宽泛的,不同的人可能有着不同的解读。这个名词诞生以来,在不同的阶段被赋予了不同的含义。
数据湖的概念最早是在Hadoop World大会上提出的。当时的提出者给数据湖赋予了一个非常抽象的含义,他认为它能解决数据集市面临的一些重要问题。其中最主要的两个问题是:首先,数据集市只保留了部分属性,只能解决预先定义好的问题;另外,数据集市中反映细节的原始数据丢失了,限制了通过数据解决问题。
从解决问题的角度出发,希望有一个合适的存储来保存这些明细的、未加工的数据。因此在这个阶段,人们对数据湖的解读更多是聚焦在中心化的存储之上。
不同的云厂商也把自己的对象产存储产品称为数据湖。比如AWS在那个阶段就强调数据湖的存储属性,对应的就是自家的对象存储S3。在Wiki的定义中也是强调数据湖是一个中心化存储,可以存海量的不同种类的数据。但是当对象存储满足了大家对存储海量数据的诉求之后,人们对数据湖的解读又发生了变化。
第二阶段,对数据湖的解读更多是从开源社区和背后的商业公司发起的。比如Databricks 作为一个云中立的产品,它将云厂商的这个对象存储称为 data lakes storage,然后把自己的重心聚焦在如何基于一个中心化的存储构建一个数据分析、数据科学和机器学习的数据湖解决方案,并且把这个方案称之为lake。他们认为在这个中心化的存储之上构建事务层、索引层、元数据层,可以解决数据湖上的可靠性、性能和安全的问题。
与此同时,Uber最初也将Hudi对外称为一个事务型的数据湖,名字实际上也是由 Hadoop Updates and Incrementals缩写而来,最早也是被用于解决Uber内部离线数据的合规问题。现在他们更倾向的定义是一个流式数据湖平台,Iceberg也常常被人们纳入数据湖的讨论。尽管Ryan Blue一直宣称 Iceberg 是一个Open Table Format。这三者有一些共同点,一个是对 ACID的支持,引入了一个事务层,第二是对 streaming 和 batch的同等支持,第三就是聚焦在如何能更快地查询数据。国内也有人将 Hudi、Iceberg、Delta Lake称为数据湖的三剑客。
讲完了业界的解读,来看一下字节跳动对数据湖的解读。我们是结合字节的业务场景来解读的。通过实践总结,我们发现数据湖需要具备六大能力:
高效的并发更新能力
因为它能够改变我们在 Hive 数仓中遇到的数据更新成本高的问题,支持对海量的离线数据做更新删除。
智能的查询加速
用户使用数据湖的时候,不希望感知到数据湖的底层实现细节,数据湖的解决方案应该能够自动地优化数据分布,提供稳定的产品性能。
批流一体的存储
数据湖这个技术出现以来,被数仓行业给予了厚望,他们认为数据湖可以最终去解决一份存储流批两种使用方式的问题,从而从根本上提升开发效率和数据质量。
统一的元数据和权限
在一个企业级的数据湖当中,元数据和权限肯定是不能少的。同时在湖仓共存的情况下,用户不希望元数据和权限在湖仓两种情况下是割裂的。
极致的查询性能
用户对于数据湖的期望就是能够在数据实时入湖的同时还能做到数据的秒级可视化。
AI + BI
数据湖数据的对外输出,不只局限于BI,同时AI也是数据湖的一等公民,数据湖也被应用在了字节的整个推荐体系,尤其是特征工程当中。实时数据湖其实是数据湖之上,更加注重数据的实时属性或者说流属性的一个数据湖发展方向。当然,正如业界对于数据湖的解读一直在演变,我们对数据湖的解读也不会局限于以上场景和功能。
接下来介绍数据湖落地的挑战和应对。字节内部的数据湖最初是基于开源的数据湖框架Hudi构建的,选择Hudi,最简单的一个原因是相比于 Iceberg 和 Delta Lake,Hudi原生支持可扩展的索引系统,能够帮助数据快速定位到所在的位置,达到高效更新的效果。
在尝试规模化落地的过程中,我们主要遇到了四个挑战:数据难管理、并发更新弱、更新性能差,以及日志难入湖。
接下来会一一介绍这些挑战背后出现的原因以及我们应对的策略。
1. 数据难管理
下图是一个典型的基于中心化存储构建数仓机器学习和数据科学的架构。这里将加工过后的数据保存在数仓中,通过数仓的元数据进行组织。数据科学家和机器学习框架都会直接去这个中心化的存储中获取原始数据。因此在这个中心化存储之上的数据对用户来说是完全分散的,没有一个全局的视图。
为了解决这个数据难管理的问题,Databricks 提出了一个Lakehouse 的架构,就是在存储层之上去构建统一的元数据缓存和索引层,所有对数据湖之上数据的使用都会经过这个统一的一层。这和我们的目标很相似,但是现实比较残酷,我们面临的是海量存量数据,这些存量数据不管是数据格式的迁移,还是使用方式的迁移,亦或是元数据的迁移,都意味着巨大的投入。因此在很长一段时间里,我们都会面临数仓和数据湖共存这样一个阶段。在这一阶段,两者的连通性是用户最为关心的。
我们在数据湖和数仓之上,构建了一层统一的元数据层,这层元数据层屏蔽了下层各个系统的元数据的异构性,由统一的元数据层去对接 BI 工具,对接计算引擎,以及数据开发、治理和权限管控的一系列数据工具。而这一层对外暴露的 API 是与 Hive 兼容的。
尽管 Hive 这个引擎已经逐渐被其他更新的计算引擎代替了,比如Spark、Presto、Flink,但是它的源数据管理依旧是业界的事实标准。另外,一些云厂商即使选择构建了自己的元数据服务,也都同时提供了和 HMS 兼容的元数据查询接口,各个计算引擎也都内置了Hive Catalog 这一层。
解决了上层的访问统一的问题,但依旧没有解决数据湖和数仓元数据本身的异构问题。这个异构问题是如何导致的呢?为什么Hive Matestore 没有办法去满足元数据管理的这个诉求?
这就涉及到数据湖管理元数据的特殊性。以Hudi为例,作为一个典型的事务型数据湖,Hudi使用时间线 Timeline 来追踪针对表的各种操作。比如commit compaction clean,Timeline 类似于数据湖里的事务管理器,记录对表的更改情况。这些更改或事务记录了每次更新的操作是发生在哪些文件当中,哪些文件为新增,哪些文件失效,哪些数据新增,哪些数据更新。
总结下来,数据湖是通过追踪文件来管理元数据。管理的力度更细了,自然也就避免了无效的读写放大,从而提供了高效的更新删除、增量消费、时间旅行等一系列的能力。但这其实也意味着另外一个问题,就是一个目录中可以包含多个版本的文件,这与 Hive 管理元数据的方式产生了分歧,因为 Hive Metastore 是通过目录的形式来管理元数据的,数据更新也是通过覆盖目录来保证事务。
由于对元信息的管理力度不同,基于 Hive Metastore的元数据管理其实是没有办法实现数据湖刚刚提到的一系列能力。针对这个问题,Hudi社区的解决方案是使用一个分布式存储来管理这个 Timeline 。Timeline 里面记录了每次操作的元数据,也记录了一些表的 schema 和分区的信息,通过同步到 Hive Metastore 来做元数据的展示。这个过程中我们发现了三个问题。
第一个问题是分区的元数据是分散在两个系统当中的,缺乏 single source of true。
第二个问题是分区的元数据的获取需要从 HDFS 拉取多个文件,没有办法给出类似于 HMS 这样的秒级访问响应。服务在线的数据应用和开发工具时,这个延迟没有办法满足需求。
第三个问题是读表的时候需要拉取大量的目录和 Timeline 上记录的表操作对应的元数据进行比对,找出最新的这个版本包含的文件。元数据读取本身就很重,并且缺乏裁剪能力,这在近实时的场景下带来了比较大的overhead。
Hudi Metastore Server 融合了 Hive Metastore 和 Hudi MetaData管理的优势。
首先,Hudi Metastore Server 提供了多租户的、中心化的元数据管理服务,将文件一级的元数据保存在适合随机读写的存储中,让数据湖的元数据不再分散在多个文件当中,满足了single source of true。
其次,Hudi Metastore Server 针对元数据的查询,尤其是一些变更操作。比如Job position 提供了与 Hive Metastore完全兼容的接口,用户在使用一张数据湖上的表的时候,享受到这些增加的高效更新、删除、增量消费等能力的同时,也能享受到一张 Hive 表所具备的功能,例如通过Spark、Flink、Presto查询,以及在一些数据开发工具上在线的去获取到元数据以及一些分区 TTL清理的能力。
此外,Hudi Metastore Server还解决了一个关键性的问题,就是多任务并发更新弱的问题。
2. 并发更新弱
我们最早是基于Hudi社区的0.7版本的内核进行研发的,当时Hudi的Timeline中的操作必须是完全顺序的,每一个新的事务都会去回滚之前未完成的事务,因此无法支持并发写入。后续社区也实现了一个并发写入的方案,整体是基于分布式锁实现的,并且只支持了Spark COW表的并发写,并不适用于 Flink 或者实时的MOR表。但是多任务的并发写入是我们内部实践当中一个非常通用的诉求。因此我们在Hudi Metastore Server的Timeline之上,使用乐观锁去重新实现了这个并发的更新能力。同时我们这个并发控制模块还能支持更灵活的行列级别并发写策略,为后续要介绍到的实时数据关联的场景的落地提供了一个可能。
除了多任务的并发写入之外,我们在单个 Flink 任务的并发写入也遇到了瓶颈。由于Hudi设计之初严重依赖Spark。0.7.0的版本才刚刚支持Flink。不管是在稳定性还是在功能上都和 Spark On Hudi有非常大的差距。因此在进行高QPS入湖的情况下,我们就遇到了单个Flink任务的扩展性问题。
我们通过在Flink的 embedding term server上支持对当前进行中的事务元信息进行一下缓存,大幅提升了单个任务能够并发写入的文件量级,基本上是在80倍的量级。结合分区级别的并发写入,我们整体支撑了近千万QPS的数据量的增量入湖。
下一步的并发问题是批流并发冲突的问题。批流并发冲突问题类似于一个我们在传统数据湖中遇到的场景,就是有一连串的小事务和一个周期比较长的长事务,如果这两者发生冲突,应该如何处理。
如果让短事务等长事务完成之后再进行,那对一个实时的链路来说,意味着数据的可见性变低了。同时如果在等待过程中失败了,还会有非常高的fail over成本。但是如果我们让这个长事务失败了,成本又会很高,因为这个长事务往往需要耗费更多的资源和时间。而在批流并发冲突的这个场景下,最好是两个都不失败,但这从语义上来讲又不符合我们认知中的隔离级别。
为了解决批流冲突的问题,我们的思路是提供更灵活的冲突检查和数据合并策略。最基础的就是行级并发。首先两个独立的writer写入的数据在物理上是隔离的,借助文件系统的租约机制也能够保证对于一个文件同时只有一个writer。所以这个冲突实际上不是发生在数据层面的,而是发生在元数据层面。那数据的冲突与否,就可以交由用户来定义。很多时候入湖的数据实际上并不是一个现实中正在发生的事情,而是一个现实操作的回放。比如图中的这个场景,我们假设删除的作业是针对一个特定的 Snapshot。即使有冲突,我们可以认为整个删除的过程是瞬时完成的,后续的新事物可以追加发生在这次删除作业之后。
第二是列级并发。比如接下来在实践实际案例中,我们要介绍的这个实时数据关联场景,每个writer实际上只是根据主键去更新部分的列。因此这些数据其实在行级别看起来是冲突的,但是从列的角度来看是完全不冲突的。配合我们的一些确定性索引,数据能被写入到同一个文件组中,这样就不会出现一致性的问题。
最后是冲突合并。假如两个数据真的是在行级别和列级别都发生了冲突,那真的只能通过 fail 掉一个事务才能完成吗?我觉得是不一定的,这里我们受到了git的启发。假如两次 commit冲突了,我们可以提供merge值的策略,比如数据中带有时间戳,在合并时就可以按照时间戳的先后顺序来做合并。
3. 更新性能差
我们最早选择基于Hudi也是因为可扩展的索引系统,通过这个索引系统可以快速地定位到需要跟新的文件。这带来了三点好处:
一个是避免读取不需要的文件;二是避免更新不必要的文件;三是避免将更新的数据和历史的数据做分布式关联,而是通过提前将文件分好组的方式直接在文件组内进行合并。
在早期的落地过程当中,我们尝试尽可能复用Hudi的一些原生能力,比如Boom Filter index。但是随着数据规模的不停增长,当达到了千亿的量级之后,upsert的数据随着数据量的增长逐渐放缓,到了数千亿的量级后,消费的速度甚至赶不上生产者的速度。即使我们去为它扩充了资源,而这时的数据总量其实也只是在 TB 级别。我们分析了每个文件组的大小,发现其实文件组的大小也是一个比较合理的值,基本上是在0.5g到1g之间。进一步分析,我们发现随着数据量的增长,新的导入在通过索引定位数据的这一步花费的时间越来越长。
根本原因是Bloom Filter存在假阳性,一旦命中假阳性的case,我们就需要把整个文件组中的主键链读取上来,再进一步判断这个数据是否已经存在。通过这种方式来区分这个到底是 update 还是 insert。upsert本身就是update和insert两个操作的结合,如果发现相同组件数据不存在,就进行insert。如果存在,我们就进行 update。而 Bloom Filter由于假阳性的存在,只能加速数据的insert而没有办法去加速update。这就和我们观察到的现象很一致。因为这个 pipeline 在运行初期,大部分数据都是第一次入湖,是insert操作,因此可以被索引加速。但是规模达到一定量级之后,大部分数据都是更新操作,没有办法再被索引加速。为了解决这个问题,我们急需一个更稳定更高效的索引。
Bloom Filter索引的问题,根因是读取历史数据进行定位,导致定位的时间越来越长。那有没有什么办法是无需读历史数据,也可以快速定位到数据所在位置呢?我们想到了类似于 Hive的bucket,也就是哈希的方法来解决这个问题。
Bucket Index原理比较简单,整个表或者分区相当于是一张哈希表,文件名中记录的这个哈希值,就相当于哈希表中这个数组的值。可以根据这个数据中的主键哈希值快速定位到文件组。一个文件组就类似于哈希表中的一个链表,可以将数据追加到这个文件组当中。Bucket Index成功地解决了流式更新性能的问题。由于极低的定位数据的成本,只要设置了一个合适的bucket桶大小,就能解决导入性能的问题,将流式更新能覆盖的场景从 TB 级别扩展到了百 TB 级别。除了导入的性能,Bucket Index 还加速了数据的查询,其中比较有代表性的就是 bucket Pruning和bucket join。
当然这种索引方式也遇到了扩展性的问题,用户需要提前一步做桶数的容量规划,给一个比较安全的值,避免单个桶扩大,以便应对接下来的数据增长。在数据倾斜的场景下,为了让倾斜值尽可能分散在不同的bucket,会将bucket的数量调到很大。而每个bucket平均大小很小,会带来大量的小文件,给文件系统带来冲击的同时也会带来查询侧性能下滑和写入侧的资源浪费。同时在一线快速增长的业务,很难对容量有一个精准的预估。如果估算少了,数据量飞速增长,单个的bucket的平均大小就会很大,这就会导致写入和查询的并发度不足,影响性能。如果估算多了,就会和倾斜的场景一样出现大量的小文件。整体的rehash又是一个很重的运维操作,会直接影响业务侧对数据的生产和使用。因此不管从业务的易用性出发,还是考虑到资源的使用率和查询的效率,我们认为兼具高效导入和查询性能,也能支持弹性扩展的索引系统是一个重要的方向。
这时我们想到了可扩展hash这个数据结构。利用这个结构,我们可以很自然地做桶的分裂和合并,让整个bucket的索引从手动驾驶进化到自动驾驶。在数据写入的时候,也可以快速地根据现有的总数,推断出最深的有效哈希值的长度,通过不断地对 2 的桶深度次方进行取余的方式,匹配到最接近的分桶写入。我们将Bucket Index这个索引贡献到了社区,已在Hudi的0.11版本对外发布。
4. 日志难入湖
本质原因也是因为Hudi的索引系统。因为这个索引系统要求数据按照组件聚集,一个最简单的方式就是把这个组件设成UUID,但这样就会带来性能上的问题以及资源上的浪费。因此我们在Hudi之内实现了一套新的机制,我们认为是无索引,即绕过Hudi的索引机制,做到数据的实时入湖。同时因为没有主键,Upsert 的能力也失效了。我们提供了用更通用的 update 能力,通过shuffle hash join和 broadcast join 去完成数据实时更新。
接下来详细介绍实时数据湖在字节的实践场景。电商是字节发展非常快速的业务之一,数据增长非常快,这也对数仓的建设提出了较高的要求。目前电商业务数据还是典型的lambda架构,分为是离线数仓和实时数仓建设。在实际场景中,lambda架构的问题相信大家都已经比较了解了,我就不多做赘述了。这次的场景介绍是围绕一个主题,通过数据湖来构建实时数仓,使实时数据湖切入到实时数仓的建设当中。这不是一蹴而就的,是分阶段一步一步渗透到实时数仓的建设当中,而实时数据湖的终极目标也是在存储侧形成一个真正意义上的批流一体的架构。
我们切入的第一个阶段是实时数据的近实时可见可测。
坦白说,在实时数据湖的落地初期,对于数据湖是否能在实时数仓中真正胜任,大家都是存疑的。因此最早的切入点也比较保守,用在数据的验证环节。在电商的实时数仓中,由于业务发展快,上游系统变更,以及数据产品需求都非常多。导致实时数仓开发周期短,上线变更频繁。当前这个实时的数据的新增字段和指标逻辑变更,或者在任务重构优化时,都要对新版本的作业生成的指标进行验证。验证的目标主要有两点,一是原有指标,数据是否一致,二是新增指标的数据是否合理。
在采用数据湖的方案之前,数据湖的验证环节需要将结果导入到Kafka然后再dump到 Hive,进行全量数据校验。这里存在的一个问题就是数据无法实时或者近实时可见可检的,基本上都是一个小时级的延迟。在很多紧急上线的场景下,因为延时的问题,只能去抽测数据进行测试验证,就会影响数据质量。实时数据湖的方案,是通过将实时数据低成本的增量导入到数据湖中,然后通过Presto进行查询,然后进行实时计算汇总,计算的结果做到近实时的全面的可见可测。
当然在这个阶段中,我们也暴露出了很多数据湖上易用性的问题。业务侧的同学反馈最多的问题就是数据湖的配置过于复杂。比如要写一个数据湖的任务,Hudi自身就存在十多个参数需要在写入任务中配置。这增加了业务侧同学的学习成本和引擎侧同学的解释成本。同时还需要在Flink SQL里定义一个sync table 的DDL,写一个完整的 schema,很容易会因为页的顺序或者拼写错误导致任务失败。
我们借助了Hudi Metastore Server 的能力,封装了大量的参数。同时使用Flink Catalog的能力,对Meta Server进一步封装,让用户在配置一个 Fink SQL任务的时候,从最初的写DDL配置十多个参数,到现在只要写一条 create table like的语句,配置一张临时表,用户对这种方式的接受度普遍是比较高的。
第二个阶段,也就是第二个应用场景是数据的实时入湖和实时分析。
数据湖可以同时满足高效的实时数据增量导入和交互式分析的需求,让数据分析师可以自助搭建看板,同时也可以进行低成本的数据回刷,真正做到一份数据批流两种使用方式。在这个阶段,由于数据实际上已经开始生产了,用户对于数据入湖的稳定性和查询性能都有很高的要求。我们通过将Compaction任务与实时导入任务拆分,首先解决了资源抢占导致的入湖时效性比较低的问题,同时设计了compaction service,负责compaction任务的调度,整个过程对业务侧同学完全屏蔽。我们在服务层面也对报警和监控进行了加强,能够做到先于业务去发现问题,处理问题,进一步提升了任务的稳定性,也让我们的使用方能够更有信心地去使用实时数据湖。
在查询的优化上面,我们优化了读文件系统的长尾问题,支持了实时表的列裁剪。同时我们对Avro日志进行了短序列化和序列化的case by case的优化,还引入了列存的 log进一步提升查询性能。除了实时数据分析之外,这种能力还可以用于机器学习。在特征过程当中,有些label是可以快速地从日志中实时获取到的。比如对一个视频点了个赞,和特征是可以关联上的。
有些label的生成则是长周期的,比如在抖音上买了一个东西,或者把一个东西加入购物车,到最后的购买,这整个链路是很长的,可能涉及到天级别或者周级别的一个不定周期。但是在这两种情况下,它的特征数据基本上都是相同的,这也使底层的存储有了批流两种使用方式的诉求,以往都是通过冗余的存储和计算来解决的。通过数据湖可以将短周期的特征和标签实时地入湖,长周期的每天做一次调度,做一个批式入湖,真正能做到一份数据去适用多个模型。
第三个阶段的应用场景是数据的实时多维汇总。
在这个阶短最重要的目标是实时数据的普惠。因为很多的实时数据使用方都是通过可视化查询或者是数据服务去消费一个特定的汇总数据,而这些重度汇总过后的实时数据使用率相对来说是比较低的。因此我们和数仓的同学共同推进了一个实时多维汇总的方案落地。数仓的同学通过实时计算引擎完成数据的多维度的轻度汇总,并且实时地更新入湖。下游可以灵活地按需获取重度汇总的数据,这种方式可以缩短数据链路,提升研发效能。
在实际的业务场景中,对于不同的业务诉求,又可以细分成三个不同的子场景。
第一个场景是内部用户的可视化查询和报表这一类场景。它的特点是查询频率不高,但是维度和指标的组合灵活,同时用户也能容忍数秒的延迟。在这种场景下,上层的数据应用直接调用底层的 Presto 引擎行为实时入库的数据进行多维度的重度聚合之后,再做展现。
另外一个主要的场景就是面向在线的数据产品。这种场景对高查询频率、低查询延迟的诉求比较高,但是对数据可见性的要求反而不那么高。而且,经过重度汇总的数据量也比较小,这就对数据分析工具提出了比较大的挑战。因此在当前阶段,我们通过增加了一个预计算链路来解决。
下面一个问题,多维重度汇总的多维计算结果是从我们湖里批量读出来,然后定时地去写入 KV存储,由存储去直接对接数据产品。从长期来看,我们下一步计划就是对实时数据湖之上的表去进行自动地构建物化视图,并且加载进缓存,以此来兼顾灵活性和查询性能,让用户在享受这种低运维成本的同时,又能满足低延低查询延迟、高查询频率和灵活使用的诉求。
第四个典型的场景是实时数据关联。数据的关联在数仓中是一个非常基础的诉求,数仓的同学需要将多个流的指标和维度列进行关联,形成一张宽表。但是使用维表join,尤其是通过缓存加速的方式,数据准确性往往很难保障。而使用多流join 的方式又需要维持一个大状态,尤其是对于一些关联周期不太确定的场景,稳定性和准确性之间往往很难取舍。
基于以上背景,我们的实时数据湖方案通过了这个列级的并发写入和确定性的索引。我们支持多个流式任务并发地去写入同一张表中,每个任务只写表中的部分列。数据写入的 log 件在物理上其实是隔离的,每个log文件当中也只包含了宽表中的部分列,实际上不会产生互相影响。再异步地通过compaction任务定期的对之前对log数据进行合并,在这个阶段对数据进行真正的实际的关联操作。通过这种方式,提供一个比较稳定的性能。使用这一套方案,实时关联用户也不用再关注状态大小和TTL该如何设置这个问题了,宽表的数据也可以做到实时可查。
最后一个阶段是实时数据湖的终极阶段,目前仍在探索中。我们只在部分场景开启了验证。在这个架构里面,数据可以从外部的不同数据源中实时或者批量的入湖和出湖,而流批作业完成湖内的数据实时流转,形成真正意义上的存储层批流一体。
同时在这套架构中,为了解决实时数据湖从分钟级到秒级的最后一公里,我们在实时引擎与数据湖的表之间增加了一层数据加速服务。在这层数据加速服务之上,多个实时作业可以做到秒级的数据流转,而这个服务也会解决频繁流式写入频繁提交导致的小文件问题,为实时数据的交互查询进一步提速。
除此之外,由于流批作业的特性不同,批计算往往会需要更高的瞬时吞吐。因此这些批计算任务也可以直接读写底层的池化文件系统,做到极强的扩展性,真正意义上做到批流写入的隔离,批作业的写入不会受限于加速服务的带宽。在这个批流一体的架构中,数据湖之上的用户,不管是SQL查询,还是BI 、AI ,都可以通过一个统一的 table format 享受到数据湖之上数据的开放性。
未来规划主要聚焦于三个维度:功能层面的规划,开源层面的规划,以及商业化输出相关的一些规划。
1. 功能层面
首先是功能维度,我们认为一个更智能的实时数据湖的加速系统是我们最重要的目标之一。
元数据层面的加速
数据湖托管了文件级别的元数据,元数据的数据量相比数仓有了几个量级的增长,但同时也给我们带来了一些优化的机会。比如我们未来计划将查询的谓词直接下推到元数据系统当中,让这个引擎在scan阶段无需访问系统,直接去跳过无效文件来提升查询的性能。
数据的加速
当前的实时数据湖由于其 serverless 架构对文件系统的重度依赖,在生产实践中还是处于分钟级,秒级依旧处于验证阶段。那我们接下来计划将这个数据湖加速服务不断地去打磨成熟,用来做实时数据的交换和热数据的存储,以解决分钟级到秒级的最后一公里问题。智能加速层面临的最大的挑战是批流数据写入的一致性问题,这也是我们接下来重点要解决的问题。例如在这种端到端的实时生产链路中,如何在提供秒级延时的前提下解决类似于跨表事务的问题。
索引加速
通过bucket, zorder等一系列的主键索引,进一步提升数据湖之上的数据的查询性能,过滤掉大量的原始数据,避免无效的数据交换。同时我们接下来也会非常注重二级索引的支持,因为二级索引的支持可以延伸湖上数据的更新能力,从而去加速非主线更新的效率。
智能优化
我们接下来会通过一套表优化服务来实现智能优化,因为对于两个类似的查询能否去提供一个稳定的查询性能,表的数据分布是一个关键因素。从用户的角度来看,用户只要查询快、写入快,像类似于compaction或clustering、索引构建等一系列的表优化的方式,只会提升用户的使用门槛。我们的计划是通过一个智能的表优化服务分析用户的查询特征,同时监听这个数据湖上数据的变化,自适应地触发这个表的一系列优化操作,可以做到在用户不需要了解过多细节的情况下,做到智能的互加速。
2. 开源层面
第二个维度是开源贡献。我们现在一直在积极地投入到Hudi的社区贡献当中,参与了多个Hudi的核心feature的开发和设计。其中Bucket index是我们合入到社区的第一个核心功能,而当下我们也在同时贡献着多个重要的功能,比如最早提到的解决数据难管理的Hudi MetaStore Server,我们已经贡献到社区了,去普惠到开源社区。因为我们发现Hudi MetaStore Server不止解决我们在生产实践中遇到的问题,也是业界普遍遇到的一个问题。现在也在跟Hudi社区的PMC共同探讨数据湖的元数据管理系统制定标准。
其它一些功能我们也计划分两个阶段贡献到社区。比如 RPC 42,将我们的湖表管理服务与大家共享,长期来看能够做到数据湖上的表的自动优化。还有 Trino 和Presto DB 的 Hudi Connector,目前也是在和Hudi背后的生态公司共同推进投入到开源社区当中。
3. 商业化输出
当前在火山引擎之上,我们将内部的数据湖技术实践同时通过LAS和EMR这两个产品向外部企业输出。其中LAS湖仓一体分析服务是一个整体面向湖仓一体架构的Serverless数据处理分析服务,提供一站式的海量数据存储计算和交互分析能力,完全兼容 Spark、Presto和Flink生态。同时这个产品具备了完整的字节内部的实时数据湖的成熟能力,能够帮助企业轻松完成湖仓的构建和数据价值的洞察。
另外一个产品 EMR 是一个Stateless的云原生数仓,100%开源兼容,在这个产品当中也会包含字节数据湖实践中一些开源兼容的优化,以及一些引擎的企业级增强,以及云上便捷的运维能力。
最后,欢迎大家关注字节跳动数据平台公众号,在这里有非常多的技术干货、产品动态和招聘信息。
05
Q:可扩展性的 Bucket Index 具体是怎么做的?
A:可控扩展性的Bucket Index其实是把哈希值的 String 用一个字典树的思路去解决。我们把它当成一个一个的 bit ,比如说当我们把两个 bucket 合并了之后,我们就可以少用一个 bit,如果我们把一个 bucket 分裂之后,就会增加一个 bit。
然后这里面其实主要是两点,一个是查询层我们怎么去识别它到底属于哪个 bucket。这个我们是可以通过一个当前的桶数算出一个最大的这个哈希深度。然后我们去对哈希值和这个桶的深度的N次方去进行取余。如果取余能匹配上,就说明这个桶是存在的。如果匹配不上,我们就把这个深度减1,然后再进行取余,直到能匹配上为止,这个是在写入的时候。
第二个就是在查询层面,我们会找一个合理的并行度,比如说我们这个桶的深度可能是6,但是这个6的文件占的数量特别少,那我们可能就再把它减少一位。然后从整个查询的这个角度来看,我们减少一位的话,这个数据分布其实应该是更为合理的。我们把文件先分好组,让每个 task 去拿到对应的一个特定的哈希值上的一个文件。
还有一个就是当数据真正发生这merge 和 split 的时候,这个阶段我们是如何处理的?这个阶段其实这样的,当一个文件发生分裂的时候,它原始的数据是不用动的。我们可以认为它就是一个引用,因为我们匹配到了新的file group。我们可以找到之前它引用的原生没有扩容的这个bucket,然后我们依旧还是可以去把这个数据拿到,并且在这个没有扩容的file group上,我们可以套一层 hash filter ,然后可以保证这个数据不会有重复。最后我们异步地去做一个 clustering这个时候真正地去对数据物理上面去完成一个历史数据的重分布。
Q:这边对数据湖的应用主要是实时数仓吗?
A:实时数仓是我们非常重要的一个落地场景。这次为什么着重介绍实时数仓,也是这次的这个整体的 topic 是字节跳动实时数据湖的引用。这个数据湖在我们内部其实也会用于离线数仓,可能也会用于推荐系统,很多场景都会有相应的一个应用。
Q:感觉schema on read的这种特性的实践和预期并不一致。
A:其实是这样的,schema on read目前的实践整体来说是比较少的,但是其实我们是有一些预期的。我可以大概讲一下我的理解,首先我们在数据入湖的时候,对数据的期望还是它要是结构化的。但是我们schema on read的核心可能不是说去支持这种类似于非结构化或者说是没有办法去结构化的数据,我们的核心可能是要去支持数据的一个灵活的演变能力。那这里面其实有几种思路。
第一种思路的话就是我们在表的schema层,去做一个灵活演变的支持。第二个思路也非常的类似于 git 的思路,就是我们的这个用户其实对同一份数据它有不同视图的需求。我们可以把这个数据以git 的思路去把它做成分支。每个人在同一份数据上面,有一个自己的数据的视图,这个我认为可能也是 schema on read 的下一个重要的发展方向,我们可能有一张表,这张表每个人他看到的这个视图可能是不一样的。然后每个人可以往自己的视图里头去加上一些自己想要的数据。这个在实际的业务场景中其实也是存在的。比如说一个实时数据,它进来的时候,它可能这个指标不是很全的,但是我们有些指标可能是需要在这个离线加工完之后再回灌进去。那这样的话,其实这一张表对用户呈现的就是两个视图。那我们接下来可能要做的就是如何去解决这个不同视图之间的这个隔离的问题。不管是存储上面的这个隔离,还是权限上面的隔离,还是元数据上面的隔离。
Q:数据湖里面是否还需要考虑类似数仓的分层架构,如果需要的话是如何实现的?
A:这主要取决于上层用户如何使用数据湖,目前来看实际依旧还是有分层架构的,但是从底层来看,不管用户是否分层,数据湖提供的能力是一样的。
关于作者:杨诗旻,字节跳动数据平台,数据湖团队技术负责人。