指标统计:基于流计算Oceanus(Flink) 实现实时UVPV统计
导语 | 最近梳理了一下如何用Flink来实现实时的UV、PV指标的统计,并和公司内微视部门的同事交流。然后针对该场景做了简化,并发现使用Flink SQL来实现这些指标的统计会更加便捷。
一、解决方案描述
(一)概述
(二)方案架构及优势
本地数据中心(IDC)的自建Kafka集群
私有网络VPC
专线接入/云联网/VPN连接/对等连接
流计算Oceanus (Flink)
云数据库Redis
二、前置准备
(一)创建私有网络VPC
(二)创建Oceanus集群
流计算Oceanus是大数据产品生态体系的实时化分析利器,是基于Apache Flink构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算Oceanus以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
在Oceanus控制台的【集群管理->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。VPC及子网使用刚刚创建好的网络。创建完后Flink的集群如下:
(三)创建Redis集群
Redis控制台:https://console.cloud.tencent.com/redis#/
(四)配置自建Kafka集群
修改自建Kafka集群配置
advertised.listeners=PLAINTEXT://10.1.0.10:9092
advertised.host.name=PLAINTEXT://10.1.0.10:9092
修改后重启Kafka集群。
注意:若在云上使用到自建的zookeeper地址,也需要将zk配置中的hostname修改ip地址形式。
模拟发送数据到topic
Kafka客户端
./bin/kafka-console-producer.sh --broker-list 10.1.0.10:9092 --topic uvpv-demo
"record_type":0, "user_id": 2, "client_ip": "100.0.0.2", "product_id": 101, "create_time": "2021-09-08 16:20:00"} {
"record_type":0, "user_id": 3, "client_ip": "100.0.0.3", "product_id": 101, "create_time": "2021-09-08 16:20:00"} {
"record_type":1, "user_id": 2, "client_ip": "100.0.0.1", "product_id": 101, "create_time": "2021-09-08 16:20:00"} {
使用脚本发送
(五)打通自建IDC集群到腾讯云网络通信
专线接入
https://cloud.tencent.com/document/product/216适用于本地数据中心IDC与腾讯云网络打通。
云联网
https://cloud.tencent.com/document/product/877适用于本地数据中心IDC与腾讯云网络打通,也可用于云上不同地域间私有网络VPC打通。
VPN连接
https://cloud.tencent.com/document/product/554适用于本地数据中心IDC与腾讯云网络打通。
对等连接+NAT网关
对等连接:
https://cloud.tencent.com/document/product/553
NAT网关:
https://cloud.tencent.com/document/product/552适合云上不同地域间私有网络VPC打通,不适合本地IDC到腾讯云网络。
三、方案实现
(一)业务目标
网站的独立访客数量UV。Oceanus处理后在Redis中通过set类型存储独立访客数量,同时也达到了对同一访客的数据去重的目的。
网站商品页面的点击量PV。Oceanus处理后在Redis中使用list类型存储页面点击量。
转化率(转化率=成交次数/点击量)。Oceanus处理后在Redis中用String存储即可。
(二)源数据格式
Kafka内部采用json格式存储,数据格式如下:
# 浏览记录
{
"record_type":0, # 0 表示浏览记录
"user_id": 6,
"client_ip": "100.0.0.6",
"product_id": 101,
"create_time": "2021-09-06 16:00:00"
}
# 购买记录
{
"record_type":1, # 1 表示购买记录
"user_id": 6,
"client_ip": "100.0.0.8",
"product_id": 101,
"create_time": "2021-09-08 18:00:00"
}
(三)编写Flink SQL作业
定义Source
CREATE TABLE `input_web_record` (
`record_type` INT,
`user_id` INT,
`client_ip` VARCHAR,
`product_id` INT,
`create_time` TIMESTAMP,
`times` AS create_time,
WATERMARK FOR times AS times - INTERVAL '10' MINUTE
) WITH (
'connector' = 'kafka', -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置 Connector
'topic' = 'uvpv-demo',
'scan.startup.mode' = 'earliest-offset',
--'properties.bootstrap.servers' = '82.157.27.147:9092',
'properties.bootstrap.servers' = '10.1.0.10:9092',
'properties.group.id' = 'WebRecordGroup', -- 必选参数, 一定要指定 Group ID
'format' = 'json',
'json.ignore-parse-errors' = 'true', -- 忽略 JSON 结构解析异常
'json.fail-on-missing-field' = 'false' -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
);
定义Sink
-- UV sink
CREATE TABLE `output_uv` (
`userids` STRING,
`user_id` STRING
) WITH (
'connector' = 'redis',
'command' = 'sadd', -- 使用集合保存uv(支持命令:set、lpush、sadd、hset、zadd)
'nodes' = '192.28.28.217:6379', -- redis连接地址,集群模式多个节点使用'',''分隔。
-- 'additional-key' = '
', -- 用于指定hset和zadd的key。hset、zadd必须设置。 'password' = 'yourpassword'
);
-- PV sink
CREATE TABLE `output_pv` (
`pagevisits` STRING,
`product_id` STRING,
`hour_count` BIGINT
) WITH (
'connector' = 'redis',
'command' = 'lpush', -- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd)
'nodes' = '192.28.28.217:6379', -- redis连接地址,集群模式多个节点使用'',''分隔。
-- 'additional-key' = '
', -- 用于指定hset和zadd的key。hset、zadd必须设置。 'password' = 'yourpassword'
);
-- 转化率 sink
CREATE TABLE `output_conversion_rate` (
`conversion_rate` STRING,
`rate` STRING
) WITH (
'connector' = 'redis',
'command' = 'set', -- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd)
'nodes' = '192.28.28.217:6379', -- redis连接地址,集群模式多个节点使用'',''分隔。
-- 'additional-key' = '
', -- 用于指定hset和zadd的key。hset、zadd必须设置。 'password' = 'yourpassword'
);
业务逻辑
-- 加工得到 UV 指标,统计所有时间内的 UV
INSERT INTO output_uv
SELECT
'userids' AS `userids`,
CAST(user_id AS string) AS user_id
FROM input_web_record ;
-- 加工并得到 PV 指标,统计每 10 分钟内的 PV
INSERT INTO output_pv
SELECT
'pagevisits' AS pagevisits,
CAST(product_id AS string) AS product_id,
SUM(product_id) AS hour_count
FROM input_web_record WHERE record_type = 0
GROUP BY
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id,
user_id;
-- 加工并得到转化率指标,统计每 10 分钟内的转化率
INSERT INTO output_conversion_rate
SELECT
'conversion_rate' AS conversion_rate,
CAST( (((SELECT COUNT(1) FROM input_web_record WHERE record_type=0)*1.0)/SUM(a.product_id)) as string)
FROM (SELECT * FROM input_web_record where record_type = 1) AS a
GROUP BY
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id;
(四)结果验证
userids: 存储UV
pagevisits: 存储PV
conversion_rate: 存储转化率,即购买商品次数/总页面点击量。
四、总结
流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓
👇点击下方「阅读原文」,了解腾讯云流计算Oceanus更多信息~