基于Flink快速开发实时TopN程序最简单的思路

程序源代码

共 5196字,需浏览 11分钟

 ·

2020-09-29 14:16

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

11be74a037024c28e98ac74e6f2f02e3.webp

a32bc2717e887d7fc9be7390db996a96.webp

大数据技术与架构点击右侧关注,大数据开发领域最强公众号!

d44dce709aff2e8ab28bc5a36d96cdb4.webp

大数据真好玩点击右侧关注,大数据真好玩!21483bfb68dc01820d10e9dad3916056.webp


TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜。流式的TopN可以使业务方在内存中按照某个统计指标(如出现次数)计算排名并快速出发出更新后的排行榜。

我们以统计词频为例展示一下如何快速开发一个计算TopN的flink程序。

Flink支持各种各样的流数据接口作为数据的数据源,本次demo我们采用内置的socketTextStream作为数据数据源。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作为时间语义
DataStream text = env.socketTextStream(hostName, port); //监听指定socket端口作为输入

与离线wordcount类似,程序首先需要把输入的整句文字按照分隔符split成一个一个单词,然后按照单词为key实现累加。

DataStream> ds = text
.flatMap(new LineSplitter()); //将输入语句split成一个一个单词并初始化count值为1的Tuple2类型
private static final class LineSplitter implements
FlatMapFunction> {

@Override
public void flatMap(String value, Collector> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");

// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}
DataStream> wcount = ds
.keyBy(0) //按照Tuple2的第一个元素为key,也就是单词
.window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(20)))
//key之后的元素进入一个总时间长度为600s,每20s向后滑动一次的滑动窗口
.sum(1);// 将相同的key的元素第二个count值相加

全局TopN

数据流经过前面的处理后会每20s计算一次各个单词的count值并发送到下游窗口。

  DataStream> ret = wcount
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
//所有key元素进入一个20s长的窗口(选20秒是因为上游窗口每20s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
.process(new TopNAllFunction(5));//计算该窗口TopN

windowAll是一个全局并发为1的特殊操作,也就是所有元素都会进入到一个窗口内进行计算。

private static class TopNAllFunction
extends
ProcessAllWindowFunction, Tuple2, TimeWindow> {

private int topSize = 10;

public TopNAllFunction(int topSize) {
// TODO Auto-generated constructor stub

this.topSize = topSize;
}

@Override
public void process(
ProcessAllWindowFunction, Tuple2, TimeWindow>.Context arg0,
Iterable> input,
Collector> out) throws Exception {
// TODO Auto-generated method stub

TreeMap> treemap = new TreeMap>(
new Comparator() {

@Override
public int compare(Integer y, Integer x) {
// TODO Auto-generated method stub
return (x < y) ? -1 : 1;
}

}); //treemap按照key降序排列,相同count值不覆盖

for (Tuple2 element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) { //只保留前面TopN个元素
treemap.pollLastEntry();
}
}

for (Entry> entry : treemap
.entrySet()) {
out.collect(entry.getValue());
}

}

}

分组TopN

在部分场景下,用户希望根据不同的分组进行排序,计算出每个分组的一个排行榜。

  wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分组
.window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口统计上游数据
.process(new TopNFunction(5)) //分组TopN统计
private static class TupleKeySelectorByStart implements
KeySelector, String> {

@Override
public String getKey(Tuple2 value) throws Exception {
// TODO Auto-generated method stub
return value.f0.substring(0, 1); //取首字母做key
}
}
/**
*
*针对keyby window的TopN函数,继承自ProcessWindowFunction
*
*/
private static class TopNFunction
extends
ProcessWindowFunction, Tuple2, String, TimeWindow> {

private int topSize = 10;

public TopNFunction(int topSize) {
// TODO Auto-generated constructor stub
this.topSize = topSize;
}

@Override
public void process(
String arg0,
ProcessWindowFunction, Tuple2, String, TimeWindow>.Context arg1,
Iterable> input,
Collector> out) throws Exception {
// TODO Auto-generated method stub

TreeMap> treemap = new TreeMap>(
new Comparator() {

@Override
public int compare(Integer y, Integer x) {
// TODO Auto-generated method stub
return (x < y) ? -1 : 1;
}

});

for (Tuple2 element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) {
treemap.pollLastEntry();
}
}

for (Entry> entry : treemap
.entrySet()) {
out.collect(entry.getValue());
}
}
}

上面的代码实现了按照首字母分组,取每组元素count最高的TopN方法。

嵌套TopN

全局topN的缺陷是,由于windowall是一个全局并发为1的操作,所有的数据只能汇集到一个节点进行 TopN 的计算,那么计算能力就会受限于单台机器,容易产生数据热点问题。

解决思路就是使用嵌套 TopN,或者说两层 TopN。在原先的 TopN 前面,再加一层 TopN,用于分散热点。例如可以先加一层分组 TopN,第一层会计算出每一组的 TopN,而后在第二层中进行合并汇总,得到最终的全网TopN。第二层虽然仍是单点,但是大量的计算量由第一层分担了,而第一层是可以水平扩展的。

e2d417e9371692fca39820ebd2b0b616.webpd2c067f596ba19533cc9a419995fd6db.webp

版权声明:

本文为大数据技术与架构整理,原作者独家授权。未经原作者允许转载追究侵权责任。编辑|冷眼丶微信公众号|import_bigdata


欢迎点赞+收藏+转发朋友圈素质三连



文章不错?点个【在看】吧! ?

浏览 29
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报