Flink实战篇|FlinkSQL窗口提前触发实战解析
简介
正常线上业务计算设置的window窗口比较大,比如1个小时,1天,甚至一周。那么是不是只能在窗口结束之后才能看到数据呢?对于实时的任务,每天或者一周结束的时候,才能输出计算的结果,没有任何意义,我们需要的是实时更新的中间结果数据。比如实时的PV,UV等指标计算。下面介绍一下Flink SQL如何提前触发窗口计算。
实现方式
不开启提前触发窗口
如下demo,从kafka读取数据,窗口聚合,输出一天的pv、uv
-- kafka source
drop table if exists user_log;
CREATE TABLE user_log(
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
proc_time as PROCTIME(),
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) WITH (
'connector' = 'kafka'
,'topic' = 'user_log'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'latest-offset'
,'format' = 'json'
);drop table if exists user_log_sink_1;CREATE TABLE user_log_sink_1
(
wCurrent string
,wStart STRING
,wEnd STRING
,pv bigint
,uv bigint
,primary key(wCurrent,wStart,wEnd) not enforced
) WITH (-- 'connector' = 'print'
'connector' = 'upsert-kafka'
,'topic' = 'user_log_sink'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'key.format' = 'json'
,'value.format' = 'json'
);-- window aggregationinsert into user_log_sink_1select date_format(now(), 'yyyy-MM-dd HH:mm:ss')
,date_format(TUMBLE_START(proc_time, INTERVAL '1' minute), 'yyyy-MM-dd HH:mm:ss') AS wStart
,date_format(TUMBLE_END(proc_time, INTERVAL '1' minute), 'yyyy-MM-dd HH:mm:ss') AS wEnd
,count(1) coun
,count(distinct user_id)from user_loggroup by TUMBLE(proc_time, INTERVAL '1' minute)
注:为了方便测试,1 天的窗口改为 1 分钟的窗口
任务流图如下:
任务输出如下:
+I[2022-06-01 17:14:00, 2022-06-01 17:13:00, 2022-06-01 17:14:00, 29449, 9999]
+I[2022-06-01 17:15:00, 2022-06-01 17:14:00, 2022-06-01 17:15:00, 29787, 9999]
+I[2022-06-01 17:16:00, 2022-06-01 17:15:00, 2022-06-01 17:16:00, 29765, 9999]
+I[2022-06-01 17:17:00, 2022-06-01 17:16:00, 2022-06-01 17:17:00, 29148, 9999]
+I[2022-06-01 17:18:00, 2022-06-01 17:17:00, 2022-06-01 17:18:00, 30079, 9999]
.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5))
提前触发窗口
TableConfig config = tenv.getConfig();
config.getConfiguration().setBoolean("table.exec.emit.early-fire.enabled",true);
config.getConfiguration().setString("table.exec.emit.early-fire.delay","10s");
//每10秒钟触发一次输出
可以提前触发窗口的结果
任务同上,只是添加以上两个参数
任务输出结果如下:
2022-06-01 17:54:21,031 INFO - add parameter to table config: table.exec.emit.early-fire.enabled = true2022-06-01 17:54:21,032 INFO - add parameter to table config: table.exec.emit.early-fire.delay = 50002022-06-01 17:54:21,032 INFO - add parameter to table config: pipeline.name = test_table_parameter
+I[2022-06-01 17:54:35, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 2527, 2500]
-U[2022-06-01 17:54:40, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 2527, 2500]
+U[2022-06-01 17:54:40, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 5027, 5000]
-U[2022-06-01 17:54:45, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 5027, 5000]
+U[2022-06-01 17:54:45, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 7527, 7500]
-U[2022-06-01 17:54:50, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 7527, 7500]
+U[2022-06-01 17:54:50, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 10079, 9999]
-U[2022-06-01 17:54:55, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 10079, 9999]
+U[2022-06-01 17:54:55, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 12579, 9999]
-U[2022-06-01 17:55:00, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 12579, 9999]
+U[2022-06-01 17:55:00, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 14579, 9999]
+I[2022-06-01 17:55:05, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 2500, 2500]
-U[2022-06-01 17:55:10, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 2500, 2500]
+U[2022-06-01 17:55:10, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 5299, 5000]
。。。忽略部分中间结果
-U[2022-06-01 17:55:55, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 25364, 9999]
+U[2022-06-01 17:55:55, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 27864, 9999]
-U[2022-06-01 17:56:00, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 27864, 9999]
+U[2022-06-01 17:56:00, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 29867, 9999]
+I[2022-06-01 17:56:05, 2022-06-01 17:56:00, 2022-06-01 17:57:00, 2500, 2500]
-U[2022-06-01 17:56:10, 2022-06-01 17:56:00, 2022-06-01 17:57:00, 2500, 2500]
insert into user_log_sink_1select date_format(now(), 'yyyy-MM-dd HH:mm:ss')
,date_format(window_start, 'yyyy-MM-dd HH:mm:ss') AS wStart
,date_format(window_end, 'yyyy-MM-dd HH:mm:ss') AS wEnd
,count(user_id) pv
,count(distinct user_id) uvFROM TABLE(
TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '1' MINUTES )) t1gro
报错:
Exception in thread "main" org.apache.flink.table.api.TableException: Currently, window table function based aggregate doesn't support early-fire and late-fire configuration 'table.exec.emit.early-fire.enabled' and 'table.exec.emit.late-fire.enabled'.
at org.apache.flink.table.planner.plan.utils.WindowUtil$.checkEmitConfiguration(WindowUtil.scala:262)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLocalWindowAggregate.translateToExecNode(StreamPhysicalLocalWindowAggregate.scala:127)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
at org.apache.flink.table.planner.plan.nod
评论