AI预测:基于流计算Oceanus (Flink) 实现病症的实时预测
一、方案描述
(一)概述
(二)方案架构
流计算Oceanus(Flink)
智能钛机器学习平台(TI-ONE)
智能钛弹性模型服务(TI-EMS)
消息队列CKafka
云数据仓库ClickHouse
对象存储(COS)
二、前置准备
(一)创建私有网络VPC
(二)创建流计算Oceanus集群
流计算Oceanus是大数据产品生态体系的实时化分析利器,是基于Apache Flink构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算Oceanus以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
在流计算Oceanus控制台[2]的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。VPC及子网使用刚刚创建好的网络。创建完后Flink的集群如下:
(三)创建CKafka实例
(四)创建COS实例
(五)创建ClickHouse集群
# 下载 ClickHouse-Client 命令
wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-20.7.2.30-2.noarch.rpm
wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-20.7.2.30-2.x86_64.rpm
# 安装客户端
rpm -ivh *.rpm
# 使用 tcp 端口登陆 ClickHouse 集群,IP 地址可通过控制台查看
clickhouse-client -hxxx.xxx.xxx.xxx --port 9000
-- 创建数据库
CREATE DATABASE IF NOT EXISTS testdb ON CLUSTER default_cluster;
-- 创建表
CREATE TABLE testdb.model_predict_result_1 on cluster default_cluster (res String,Sign Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/model_predict_result_1', '{replica}',Sign) ORDER BY res;
(六)注册开通TI-ONE服务
智能钛机器学习平台是为AI工程师打造的一站式机器学习服务平台,为用户提供从数据预处理、模型构建、模型训练、模型评估到模型服务的全流程开发及部署支持。
单击【前往访问管理】,页面将跳转至访问管理控制台。
单击【同意授权】,即可创建服务预设角色并授予智能钛机器学习平台相关权限。
(七)注册开通TI-EMS服务
智能钛弹性模型服务(Tencent Intelligence Elastic Model Service,TI-EMS)是具备虚拟化异构算力和弹性扩缩容能力的无服务器化在线推理平台。
三、方案实现
(一)离线模型训练
数据集介绍
离线模型训练
(二)实时特征工程
创建Source
-- random source 用于模拟患者病历实时特征数据
CREATE TABLE random_source (
ClumpThickness INT,
UniformityOfCellSize INT,
UniformityOfCellShape INT,
MarginalAdhsion INT,
SingleEpithelialCellSize INT,
BareNuclei INT,
BlandChromation INT,
NormalNucleoli INT,
Mitoses INT
) WITH (
'connector' = 'datagen',
'rows-per-second'='1', -- 每秒产生的数据条数
'fields.ClumpThickness.kind'='random', -- 无界的随机数
'fields.ClumpThickness.min'='0', -- 随机数的最小值
'fields.ClumpThickness.max'='10', -- 随机数的最大值
'fields.UniformityOfCellSize.kind'='random', -- 无界的随机数
'fields.UniformityOfCellSize.min'='0', -- 随机数的最小值
'fields.UniformityOfCellSize.max'='10', -- 随机数的最大值
'fields.UniformityOfCellShape.kind'='random', -- 无界的随机数
'fields.UniformityOfCellShape.min'='0', -- 随机数的最小值
'fields.UniformityOfCellShape.max'='10', -- 随机数的最大值
'fields.MarginalAdhsion.kind'='random', -- 无界的随机数
'fields.MarginalAdhsion.min'='0', -- 随机数的最小值
'fields.MarginalAdhsion.max'='10', -- 随机数的最大值
'fields.SingleEpithelialCellSize.kind'='random', -- 无界的随机数
'fields.SingleEpithelialCellSize.min'='0', -- 随机数的最小值
'fields.SingleEpithelialCellSize.max'='10', -- 随机数的最大值
'fields.BareNuclei.kind'='random', -- 无界的随机数
'fields.BareNuclei.min'='0', -- 随机数的最小值
'fields.BareNuclei.max'='10', -- 随机数的最大值
'fields.BlandChromation.kind'='random', -- 无界的随机数
'fields.BlandChromation.min'='0', -- 随机数的最小值
'fields.BlandChromation.max'='10', -- 随机数的最大值
'fields.NormalNucleoli.kind'='random', -- 无界的随机数
'fields.NormalNucleoli.min'='0', -- 随机数的最小值
'fields.NormalNucleoli.max'='10', -- 随机数的最大值
'fields.Mitoses.kind'='random', -- 无界的随机数
'fields.Mitoses.min'='0', -- 随机数的最小值
'fields.Mitoses.max'='10' -- 随机数的最大值
);
创建Sink
CREATE TABLE `KafkaSink` (
ClumpThickness INT,
UniformityOfCellSize INT,
UniformityOfCellShape INT,
MarginalAdhsion INT,
SingleEpithelialCellSize INT,
BareNuclei INT,
BlandChromation INT,
NormalNucleoli INT,
Mitoses INT
) WITH (
'connector' = 'kafka', -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置 Connector
'topic' = 'topic-decision-tree-predict-1', -- 替换为您要消费的 Topic
'properties.bootstrap.servers' = '172.28.28.211:9092', -- 替换为您的 Kafka 连接地址
'properties.group.id' = 'RealTimeFeatures', -- 必选参数, 一定要指定 Group ID
'format' = 'csv'
);
编写业务SQL
INSERT INTO `KafkaSink`
SELECT * FROM `random_source`
选择Connector
查询数据
(三)实时预测
启动模型服务
公网调用模型测试
单击右侧【更多】>【调用】,创建公网调用地址。
启动控制台,新建data.json文件,在某一文件夹下运行如下代码:
# 请将 <访问地址>/<密钥> 替换为实际的 IP 地址/密钥
curl -H "Content-Type: application/json" \
-H "x-Auth-Token: <密钥>" \
-X POST <访问地址>/v1/models/m:predict -d @data.json
{"instances" : [{"_c0": 3, "_c1": 2, "_c2": 3, "_c3": 0, "_c4": 0, "_c5": 2, "_c6": 1, "_c7": 0, "_c8": 1}]}
{"predictions": [{"pmml(prediction)":"1","probability(0)":"0.47058823529411764","probability(1)":"0.5294117647058824","prediction":"1.0","label":"1"}]}
通过流计算Oceanus调用模型服务
本地代码开发、调试。
进入流计算Oceanus控制台[2],单击左侧【依赖管理】新建依赖并上传JAR包。
进入【作业管理】页面,创建JAR作业,选择之前创建好的流计算Oceanus集群。
单击【开发调试】指定相应的主程序包和主类,点击【作业调试】,【内置Connector】选择flink-connector-clickhouse和flink-connector-kafka。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.util.Collector;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.json.JSONObject;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class OnlinePredict {
public static final Logger logger = LoggerFactory.getLogger(OnlinePredict.class);
public static void main(String[] args) throws Exception {
// kafka配置参数解析
final ParameterTool parameterTool = ParameterTool
.fromPropertiesFile(OnlinePredict.class.getResourceAsStream("/KafkaSource.properties"));
// 实例化运行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
// checkpoint配置
streamEnv.enableCheckpointing(parameterTool.getLong("flink.stream.checkpoint.interval", 30_000));
streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 重启策略
streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10_000));
// source、transfer、sink
DataStream
stringResult = streamEnv.addSource(buildKafkaSource(parameterTool)) .flatMap(new FlatMapFunction
() { @Override
public void flatMap(String value, Collector
out) throws Exception { String paramInput = inputDataTransfer(value);
String outputData = sendHttpData(paramInput);
out.collect(outputData);
}
});
Table tableResult = tableEnv.fromDataStream(stringResult);
tableEnv.createTemporaryView("resultSink",tableResult);
tableEnv.executeSql("CREATE TABLE `CKSink` (\n" +
" res STRING,\n" +
" PRIMARY KEY (`res`) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'clickhouse',\n" +
" 'url' = 'clickhouse://172.28.1.138:8123',\n" +
" 'database-name' = 'testdb',\n" +
" 'table-name' = 'model_predict_result_1',\n" +
" 'table.collapsing.field' = 'Sign'\n" +
")");
tableEnv.executeSql("insert into CKSink select * from resultSink");
}
// kafka source
public static SourceFunction
buildKafkaSource(ParameterTool parameterTool) throws Exception { Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, parameterTool.get("kafka.source.bootstrap.servers"));
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, parameterTool.get("kafka.source.auto.offset.reset", "latest"));
properties.put(ConsumerConfig.GROUP_ID_CONFIG, parameterTool.get("kafka.source.group.id"));
FlinkKafkaConsumer
consumer = new FlinkKafkaConsumer ( parameterTool.get("kafka.source.topic"),
new SimpleStringSchema(),
properties);
consumer.setStartFromGroupOffsets();
return consumer;
}
// kafka 数据格式转换
// 返回数据格式:{"instances" : [{"_c0": 3, "_c1": 2, "_c2": 3, "_c3": 0, "_c4": 0, "_c5": 2, "_c6": 1, "_c7": 0, "_c8": 1}]}
public static String inputDataTransfer(String value) {
String[] input = value.split(",");
ArrayList
dataListMap = new ArrayList (); JSONObject jsondata = new JSONObject();
for (int i = 0; i < input.length; i++) {
jsondata.put("_c" + i, Double.parseDouble(input[i]));
}
dataListMap.add(jsondata);
String param = "{\"instances\":" + dataListMap.toString() + "}";
return param;
}
// TI-EMS 模型在线推理服务调用
// 返回数据格式如下:{"predictions": [{"pmml(prediction)":"1","probability(0)":"0.47058823529411764","probability(1)":"0.5294117647058824","prediction":"1.0","label":"1"}]}
public static String sendHttpData(String paramJson) throws Exception {
String data = null;
try {
// 请将 xx.xx.xx.xx:xxxx 替换为实际的 IP 地址,参考 3.2.2 图中所示 创建 VPC 调用
String url = "http://xx.xx.xx.xx:xxxx/v1/models/m:predict";
HttpClient client = HttpClientBuilder.create().build();
HttpPost post = new HttpPost(url);
post.addHeader("Content-type", "application/json");
post.addHeader("Accept", "application/json");
// 请将 xxxxxxxxxx 替换为实际密钥,参考 3.2.2 图中所示 创建 VPC 调用
post.addHeader("X-AUTH-TOKEN", "xxxxxxxxxx");
StringEntity entity = new StringEntity(paramJson, java.nio.charset.Charset.forName("UTF-8"));
post.setEntity(entity);
HttpResponse response = client.execute(post);
// 判断是否正常返回
if (response.getStatusLine().getStatusCode() == 200) {
// 解析数据
HttpEntity resEntity = response.getEntity();
data = EntityUtils.toString(resEntity);
} else {
data = "error input";
}
System.out.print(data);
System.out.println(data);
} catch (Throwable e) {
logger.error("", e);
}
return data;
}
}
# source // 请替换为实际的参数
kafka.source.bootstrap.servers=172.28.28.211:9092
kafka.source.topic=topic-decision-tree-predict-1
kafka.source.group.id=RealTimePredict1
kafka.source.auto.offset.reset=latest
<properties>
<flink.version>1.11.0flink.version>
properties>
<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-clickhouseartifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-commonartifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-java-bridge_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-javaartifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.httpcomponentsgroupId>
<artifactId>httpclientartifactId>
<version>4.5.3version>
<scope>compilescope>
dependency>
<dependency>
<groupId>org.jsongroupId>
<artifactId>jsonartifactId>
<version>20201115version>
<scope>compilescope>
dependency>
dependencies>
四、总结
新版Flink 1.13集群无需用户自己选择内置Connector,平台将自动匹配。
除了使用CKafka及ClickHouse作为数据仓库外,还可以使用Hive、Mysql、PG等作为数仓,根据用户实际需求自行选择。
本方案最简化了实时特征工程,用户可以根据自身业务需求采用SQL、JAR、ETL作业的方式完成实时特征工程。
本方案只初始化了一个PMML服务提供流计算Oceanus调用,如遇数据背压情况可增多PMML服务循环调用。
TI-ONE、TI-EMS平台暂时不支持实时训练模型,如需更新模型可以自行编写定时脚本拉取数据在TI-ONE平台训练更新。
五、参考地址