Flink + Iceberg 在去哪儿的实时数仓实践
背景及痛点
Iceberg 架构
痛点一:Kafka 数据丢失
痛点二:近实时 Hive 压力大
Iceberg 优化实践
总结
一、背景及痛点
1. 背景
2. 原架构方案
3. 痛点
Kafka 存储成本高且数据量大。Kafka 由于压力大将数据过期时间设置的比较短,当数据产生反压,积压等情况时,如果在一定的时间内没消费数据导致数据过期,会造成数据丢失。
Flink 在 Hive 上做了近实时的读写支持。为了分担 Kafka 压力,将一些实时性不太高的数据放入 Hive,让 Hive 做分钟级的分区。但是随着元数据不断增加,Hive metadata 的压力日益显著,查询也变得更慢,且存储 Hive 元数据的数据库压力也变大。
二、Iceberg 架构
1. Iceberg 架构解析
数据文件(data files)
Iceberg 表真实存储数据的文件,一般存储在 data 目录下,以 “.parquet” 结尾。
清单文件(Manifest file)
每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)。通过该文件,可过滤掉无关数据,提高检索速度。
快照(Snapshot)
快照代表一张表在某个时刻的状态。每个快照版本包含某个时刻的所有数据文件列表。Data files 存储在不同的 manifest files 里面, manifest files 存储在一个 Manifest list 文件里面,而一个 Manifest list 文件代表一个快照。
2. Iceberg 查询计划
元数据过滤
清单文件包括分区数据元组和每个数据文件的列级统计信息。在计划期间,查询谓词会自动转换为分区数据上的谓词,并首先应用于过滤数据文件。接下来,使用列级值计数,空计数,下限和上限来消除与查询谓词不匹配的文件。
Snapshot ID
每个 Snapshot ID 会关联到一组 manifest files,而每一组 manifest files 包含很多 manifest file。
manifest files 文件列表
每个 manifest files 又记录了当前 data 数据块的元数据信息,其中就包含了文件列的最大值和最小值,然后根据这个元数据信息,索引到具体的文件块,从而更快的查询到数据。
三、痛点一:Kafka 数据丢失
1. 痛点介绍
2. 解决方案
3 .为什么 Iceberg 只能做近实时入湖?
Iceberg 提交 Transaction 时是以文件粒度来提交。这就没法以秒为单位提交 Transaction,否则会造成文件数量膨胀;
没有在线服务节点。对于实时的高吞吐低延迟写入,无法得到纯实时的响应;
Flink 写入以 checkpoint 为单位,物理数据写入 Iceberg 后并不能直接查询,当触发了 checkpoint 才会写 metadata 文件,这时数据由不可见变为可见。checkpoint 每次执行都会有一定时间。
4. Flink 入湖分析
IcebergStreamWriter
主要用来写入记录到对应的 avro、parquet、orc 文件,生成一个对应的 Iceberg DataFile,并发送给下游算子。
IcebergFilesCommitter
为每个 checkpointId 维护了一个 DataFile 文件列表,即 map<Long, List<DataFile>>,这样即使中间有某个 checkpoint 的 transaction 提交失败了,它的 DataFile 文件仍然维护在 State 中,依然可以通过后续的 checkpoint 来提交数据到 Iceberg 表中。
5. Flink SQL Demo
■ 5.1 前期工作
开启实时读写功能
开启 table sql hint 功能来使用 OPTIONS 属性
注册 Iceberg catalog 用于操作 Iceberg 表
CREATE CATALOG Iceberg_catalog WITH (\n" +
" 'type'='Iceberg',\n" +
" 'catalog-type'='Hive'," +
" 'uri'='thrift://localhost:9083'" +
");
Kafka 实时数据入湖
insert into Iceberg_catalog.Iceberg_db.tbl1 \n
select * from Kafka_tbl;
数据湖之间实时流转 tbl1 -> tbl2
insert into Iceberg_catalog.Iceberg_db.tbl2
select * from Iceberg_catalog.Iceberg_db.tbl1
/*+ OPTIONS('streaming'='true',
'monitor-interval'='10s',snapshot-id'='3821550127947089987')*/
■ 5.2 参数解释
monitor-interval
start-snapshot-id
6. 踩坑记录
7. 数据样例
一秒前的数据
一秒后刷新的数据
四、痛点二:
Flink 结合 Hive 的近实时越来越慢
1. 痛点介绍
选用 Flink + Hive 的近实时架构虽然支持了实时读写,但是这种架构带来的问题是随着表和分区增多,将会面临以下问题:
元数据过多
Hive 将分区改为小时 / 分钟级,虽然提高了数据的准实时性,但是 metestore 的压力也是显而易见的,元数据过多导致生成查询计划变慢,而且还会影响线上其他业务稳定。
数据库压力变大
随着元数据增加,存储 Hive 元数据的数据库压力也会增加,一段时间后,还需要对该库进行扩容,比如存储空间。
2. 解决方案
将原先的 Hive 近实时迁移到 Iceberg。为什么 Iceberg 可以处理元数据量大的问题,而 Hive 在元数据大的时候却容易形成瓶颈?
Iceberg 是把 metadata 维护在可拓展的分布式文件系统上,不存在中心化的元数据系统;
Hive 则是把 partition 之上的元数据维护在 metastore 里面(partition 过多则给 mysql 造成巨大压力),而 partition 内的元数据其实是维护在文件内的(启动作业需要列举大量文件才能确定文件是否需要被扫描,整个过程非常耗时)。
五、优化实践
1. 小文件处理
Iceberg 0.11 以前,通过定时触发 batch api 进行小文件合并,这样虽然能合并,但是需要维护一套 Actions 代码,而且也不是实时合并的。
Table table = findTable(options, conf);
Actions.forTable(table).rewriteDataFiles()
.targetSizeInBytes(10 * 1024) // 10KB
.execute();
Iceberg 0.11 新特性,支持了流式小文件合并。
CREATE TABLE city_table (
province BIGINT,
city STRING
) PARTITIONED BY (province, city) WITH (
'write.distribution-mode'='hash'
);
2. Iceberg 0.11 排序
■ 2.1 排序介绍
■ 2.2 排序 demo
insert into Iceberg_table select days from Kafka_tbl order by days, province_id;
3. Iceberg 排序后 manifest 详解
file_path:物理文件位置。
partition:文件所对应的分区。
lower_bounds:该文件中,多个排序字段的最小值,下图是我的 days 和 province_id 最小值。
upper_bounds:该文件中,多个排序字段的最大值,下图是我的 days 和 province_id 最大值。
六、总结
相较于之前的版本来说,Iceberg 0.11 新增了许多实用的功能,对比了之前使用的旧版本,做以下总结:
Flink + Iceberg 排序功能
实时读取数据
实时合并小文件