「硬刚Doris系列」Doris高级用法

程序源代码

共 8998字,需浏览 18分钟

 ·

2022-05-28 00:40

点击上方蓝色字体,选择“设为星标”
回复"面试"获取更多惊喜

😄轻戳有惊喜:全网最全大数据面试提升手册!

1.1 添加Rollup

Rollup 可以理解为 Table 的一个物化索引结构。物化 是因为其数据在物理上独立存储,而 索引 的意思是,Rollup可以调整列顺序以增加前缀索引的命中率,也可以减少key列以增加数据的聚合度。

以下举例说明。

原表table1的Schema如下:

+----------+-------------+------+-------+---------+-------+
| Field   | Type       | Null | Key   | Default | Extra |
+----------+-------------+------+-------+---------+-------+
| siteid   | int(11)     | No   | true | 10     |       |
| citycode | smallint(6) | No   | true | N/A     |       |
| username | varchar(32) | No   | true |         |       |
| pv       | bigint(20) | No   | false | 0       | SUM   |
| uv       | bigint(20) | No   | false | 0       | SUM   |
+----------+-------------+------+-------+---------+-------+

对于 table1 明细数据是 siteid, citycode, username 三者构成一组 key,从而对 pv 字段进行聚合;如果业务方经常有看城市 pv 总量的需求,可以建立一个只有 citycode, pv 的rollup。

ALTER TABLE table1 ADD ROLLUP rollup_city(citycode, pv);

1.2 Broadcast/Shuffle Join

系统默认实现 Join 的方式,是将小表进行条件过滤后,将其广播到大表所在的各个节点上,形成一个内存 Hash 表,然后流式读出大表的数据进行Hash Join。但是如果当小表过滤后的数据量无法放入内存的话,此时 Join 将无法完成,通常的报错应该是首先造成内存超限。

如果遇到上述情况,建议显式指定 Shuffle Join,也被称作 Partitioned Join。即将小表和大表都按照 Join 的 key 进行 Hash,然后进行分布式的 Join。这个对内存的消耗就会分摊到集群的所有计算节点上。

Doris会自动尝试进行 Broadcast Join,如果预估小表过大则会自动切换至 Shuffle Join。注意,如果此时显式指定了 Broadcast Join 也会自动切换至 Shuffle Join。

使用 Broadcast Join(默认):

mysql> select sum(table1.pv) from table1 join table2 where table1.siteid = 2;
+--------------------+
| sum(`table1`.`pv`) |
+--------------------+
|                 10 |
+--------------------+
1 row in set (0.20 sec)

使用 Broadcast Join(显式指定):

mysql> select sum(table1.pv) from table1 join [broadcast] table2 where table1.siteid = 2;
+--------------------+
| sum(`table1`.`pv`) |
+--------------------+
|                 10 |
+--------------------+
1 row in set (0.20 sec)

使用 Shuffle Join:

mysql> select sum(table1.pv) from table1 join [shuffle] table2 where table1.siteid = 2;
+--------------------+
| sum(`table1`.`pv`) |
+--------------------+
|                 10 |
+--------------------+
1 row in set (0.15 sec)

1.3 Colocation Join

1.3.1 名词解释
  • FE:Frontend,Doris 的前端节点。负责元数据管理和请求接入。

  • BE:Backend,Doris 的后端节点。负责查询执行和数据存储。

  • Colocation Group(CG):一个 CG 中会包含一张及以上的 Table。在同一个 Group 内的 Table 有着相同的 Colocation Group Schema,并且有着相同的数据分片分布。

  • Colocation Group Schema(CGS):用于描述一个 CG 中的 Table,和 Colocation 相关的通用 Schema 信息。包括分桶列类型,分桶数以及副本数等。

1.3.2 原理

doris 除了支持Broadcast/Shuffle Join 之外,Colocation Join更是一大特色。Colocation Join 功能,是将一组拥有相同 CGS 的 Table 组成一个 CG。并保证这些 Table 对应的数据分片会落在同一个 BE 节点上。使得当 CG 内的表进行分桶列上的 Join 操作时,可以通过直接进行本地数据 Join,减少数据在节点间的传输耗时。

为了使得 Table 能够有相同的数据分布,同一 CG 内的 Table 必须保证以下属性相同:

  • 分桶列和分桶数
    分桶列,即在建表语句中 DISTRIBUTED BY HASH(col1, col2, ...) 中指定的列。分桶列决定了一张表的数据通过哪些列的值进行 Hash 划分到不同的 Tablet 中。同一 CG 内的 Table 必须保证分桶列的类型和数量完全一致,并且桶数一致,才能保证多张表的数据分片能够一一对应的进行分布控制。
  • 副本数
    同一个 CG 内所有表的所有分区(Partition)的副本数必须一致。如果不一致,可能出现某一个 Tablet 的某一个副本,在同一个 BE 上没有其他的表分片的副本对应。

同一个 CG 内的表,分区的个数、范围以及分区列的类型不要求一致。

1.3.3 举例说明
CREATE TABLE `tbl1` (
    `k1` date NOT NULL COMMENT "",
    `k2` int(11) NOT NULL COMMENT "",
    `v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
    PARTITION p1 VALUES LESS THAN ('2019-05-31'),
    PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
    "colocate_with" = "group1"
);
CREATE TABLE `tbl2` (
    `k1` datetime NOT NULL COMMENT "",
    `k2` int(11) NOT NULL COMMENT "",
    `v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
    "colocate_with" = "group1"
);

查看查询计划,如果 Colocation Join 生效,则 Hash Join 节点会显示 colocate: true。

DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
+----------------------------------------------------+
| Explain String                                     |
+----------------------------------------------------+
| PLAN FRAGMENT 0                                    |
|  OUTPUT EXPRS:`tbl1`.`k1` |                        |
|   PARTITION: RANDOM                                |
|                                                    |
|   RESULT SINK                                      |
|                                                    |
|   2:HASH JOIN                                      |
|   |  join op: INNER JOIN                           |
|   |  hash predicates:                              |
|   |  colocate: true                                |
|   |    `tbl1`.`k2` = `tbl2`.`k2`                   |
|   |  tuple ids: 0 1                                |
|   |                                                |
|   |----1:OlapScanNode                              |
|   |       TABLE: tbl2                              |
|   |       PREAGGREGATION: OFF. Reason: null        |
|   |       partitions=0/1                           |
|   |       rollup: null                             |
|   |       buckets=0/0                              |
|   |       cardinality=-1                           |
|   |       avgRowSize=0.0                           |
|   |       numNodes=0                               |
|   |       tuple ids: 1                             |
|   |                                                |
|   0:OlapScanNode                                   |
|      TABLE: tbl1                                   |
|      PREAGGREGATION: OFF. Reason: No AggregateInfo |
|      partitions=0/2                                |
|      rollup: null                                  |
|      buckets=0/0                                   |
|      cardinality=-1                                |
|      avgRowSize=0.0                                |
|      numNodes=0                                    |
|      tuple ids: 0                                  |
+----------------------------------------------------+

1.4 动态分区

1.4.1 原理

在某些使用场景下,用户会将表按照天进行分区划分,每天定时执行例行任务,这时需要使用方手动管理分区,否则可能由于使用方没有创建分区导致数据导入失败,这给使用方带来了额外的维护成本。

在实现方式上, FE会启动一个后台线程,根据fe.conf中dynamic_partition_enable 及 dynamic_partition_check_interval_seconds参数决定该线程是否启动以及该线程的调度频率。每次调度时,会在注册表中读取动态分区表的属性,并根据动态分区属性动态添加及删除分区。

1.4.2 举例说明

建表时,可以在 PROPERTIES 中指定以下dynamic_partition属性,表示这个表是一个动态分区表。

CREATE TABLE example_db.dynamic_partition
(
k1 DATE,
k2 INT,
k3 SMALLINT,
v1 VARCHAR(2048),
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
)
ENGINE=olap
DUPLICATE KEY(k1, k2, k3)
PARTITION BY RANGE (k1)
(
PARTITION p20200321 VALUES LESS THAN ("2020-03-22"),
PARTITION p20200322 VALUES LESS THAN ("2020-03-23"),
PARTITION p20200323 VALUES LESS THAN ("2020-03-24"),
PARTITION p20200324 VALUES LESS THAN ("2020-03-25")
)
DISTRIBUTED BY HASH(k2) BUCKETS 32
PROPERTIES(
"storage_medium" = "SSD",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-3",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32"
 );

创建一张动态分区表,指定开启动态分区特性,以当天为2020-03-25为例,在每次调度时,会删除分区上界小于 2020-03-22 的分区,为了避免删除非动态创建的分区,动态删除分区只会删除分区名符合动态创建分区规则的分区,例如分区名为a1, 则即使分区范围在待删除的分区范围内,也不会被删除。同时在调度时会提前创建今天以及以后3天(总共4天)的分区(若分区已存在则会忽略),分区名根据指定前缀分别为p20200325 p20200326p20200327 p20200328,每个分区的分桶数量为32。同时会删除 p20200321 的分区。

1.4.3 分区属性参数

dynamic_partition.enable: 是否开启动态分区特性,可指定为 TRUE 或 FALSE。如果不填写,默认为 TRUE。

dynamic_partition.time_unit: 动态分区调度的单位,可指定为 DAY WEEK MONTH,当指定为 DAY时,动态创建的分区名后缀格式为yyyyMMdd,例如20200325。当指定为 WEEK 时,动态创建的分区名后缀格式为yyyy_ww即当前日期属于这一年的第几周,例如 2020-03-25 创建的分区名后缀为 2020_13, 表明目前为2020年第13周。当指定为 MONTH 时,动态创建的分区名后缀格式为 yyyyMM,例如 202003。

dynamic_partition.start: 动态分区的开始时间, 以当天为基准,超过该时间范围的分区将会被删除。如果不填写,则默认为Integer.MIN_VALUE 即 -2147483648。

dynamic_partition.end: 动态分区的结束时间, 以当天为基准,会提前创建N个单位的分区范围。

dynamic_partition.prefix: 动态创建的分区名前缀。

dynamic_partition.buckets: 动态创建的分区所对应的分桶数量。

1.5 支持Bitmap

使用 Roaring Bitmap 数据结构,现场查询时的 IO,CPU,内存,网络资源会显著减少,并且不会随着数据规模线性增加。

CREATE TABLE `pv_bitmap` (
`dt` int,
`page` varchar(10),
`user_id` bitmap bitmap_union
)
AGGREGATE KEY(`dt`, page)
DISTRIBUTED BY HASH(`dt`) BUCKETS 2;
select bitmap_count(bitmap_union(user_id)) from pv_bitmap; 
select bitmap_union_count(user_id) from pv_bitmap;
select bitmap_union_int(id) from pv_bitmap; 
BITMAP_UNION(expr) : 计算两个 Bitmap 的并集,返回值是序列化后的 Bitmap 值 
BITMAP_COUNT(expr) : 计算 Bitmap 的基数值 
BITMAP_UNION_COUNT(expr): 和 BITMAP_COUNT(BITMAP_UNION(expr)) 等价 
BITMAP_UNION_INT(expr) : 和 COUNT(DISTINCT expr) 等价 (仅支持 TINYINT,SMALLINT 和 INT)

1.6 物化视图

物化视图是将预先计算(根据定义好的 SELECT 语句)好的数据集,存储在 Doris 中的一个特殊的表。

物化视图的出现主要是为了满足用户,既能对原始明细数据的任意维度分析,也能快速的对固定维度进行分析查询。

在没有物化视图功能之前,用户一般都是使用 Rollup 功能通过预聚合方式提升查询效率的。但是 Rollup 具有一定的局限性,他不能基于明细模型做预聚合。

物化视图则在覆盖了 Rollup 的功能的同时,还能支持更丰富的聚合函数。所以物化视图其实是 Rollup 的一个超集。

也就是说,之前 ALTER TABLE ADD ROLLUP 语法支持的功能现在均可以通过 CREATE MATERIALIZED VIEW 实现。

create materialized view store_amt as
select store_id, sum(sale_amt) from sales_records group by store_id;

如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!

2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)

互联网最坏的时代可能真的来了
我在B站读大学,大数据专业
我们在学习Flink的时候,到底在学习什么?
193篇文章暴揍Flink,这个合集你需要关注一下
Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS
Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点
我们在学习Spark的时候,到底在学习什么?
在所有Spark模块中,我愿称SparkSQL为最强!
硬刚Hive | 4万字基础调优面试小总结
数据治理方法论和实践小百科全书
标签体系下的用户画像建设小指南
4万字长文 | ClickHouse基础&实践&调优全视角解
【面试&个人成长】2021年过半,社招和校招的经验之谈
大数据方向另一个十年开启 |《硬刚系列》第一版完结
我写过的关于成长/面试/职场进阶的文章
当我们在学习Hive的时候在学习什么?「硬刚Hive续集」
浏览 9
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报