AI预测:基于流计算Oceanus (Flink) 实现病症的实时预测

云加社区

共 16794字,需浏览 34分钟

 ·

2021-11-24 08:40


一、方案描述


(一)概述


近年来,人工智能的风潮为医疗行业带来一场全新革命,AI 在辅助诊断、疾病预测、疗法选择等方面发挥着重要作用。机器学习领域的特征选择和有监督学习建模方法越来越多地用于疾病预测和辅助诊断,常用的算法如决策树、随机森林、逻辑回归等。

乳腺癌是目前发病率仅次于肺癌的常见癌症,机器学习算法能够分析已有的临床乳腺癌数据,得到与乳腺癌发病关系最密切的特征,这能够极大地帮助医生进行早期诊断,及时拯救患者。

本方案结合智能钛机器学习平台(TI-ONE)、智能钛弹性模型服务(TI-EMS)、腾讯云流计算Oceanus(Flink)、消息队列CKafka、云数据仓库ClickHouse、对象存储(COS)针对乳腺癌预测案例使用决策树分类算法实现全流程解决方案,包括离线模型训练、实时特征工程及实时在线预测功能。




(二)方案架构


首先由TI-ONE进行离线模型训练,将模型文件存放在COS上,然后由TI-EMS将模型文件封装成一个PMML模型服务供流计算Oceanus调用。流计算 Oceanus利用Datagen Connector模拟实时生成特征数据后存放在CKafka上,之后流计算Oceanus取CKafka的特征数据经过数据转换传入到TI-EMS的PMML模型服务中调用决策树分类模型并返回预测结果,最后将预测结果存储在ClickHouse中。



涉及产品列表:


  • 流计算Oceanus(Flink)

  • 智能钛机器学习平台(TI-ONE)

  • 智能钛弹性模型服务(TI-EMS)

  • 消息队列CKafka

  • 云数据仓库ClickHouse

  • 对象存储(COS)



二、前置准备


(一)创建私有网络VPC


私有网络(VPC)是一块您在腾讯云上自定义的逻辑隔离网络空间,在构建流计算Oceanus、CKafka、COS、ClickHouse集群等服务时选择的网络建议选择同一个VPC,网络才能互通。否则需要使用对等连接、NAT网关VPN等方式打通网络。私有网络VPC创建步骤请参考帮助文档[1]


(二)创建流计算Oceanus集群


流计算Oceanus是大数据产品生态体系的实时化分析利器,是基于Apache Flink构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算Oceanus以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。


流计算Oceanus控制台[2]的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。VPC及子网使用刚刚创建好的网络。创建完后Flink的集群如下:




(三)创建CKafka实例


进入CKafka控制台[3],选择左侧【实例列表】,单击【新建】进行购买,注意【地域】需选择VPC所在地域,VPC选择及子网选择之前创建的VPC 和子网。新建成功后,单击实例进入实例详情页面,单击【topic管理】新建topic。


(四)创建COS实例


进入COS控制台[4],选择左侧【存储桶列表】,单击【创建存储桶】,【所属地域】选择VPC所在地域,具体操作细节可参考COS控制台快速入门[5]


(五)创建ClickHouse集群


进入ClickHouse控制台[6],单击【新建集群】创建ClickHouse集群,注意地域、可用区和网络的选择。创建成功之后选择一台与其同VPC的CVM进入,在该CVM下下载ClickHouse客户端,创建数据库和表。具体操作可参考ClickHouse快速入门[7]



# 下载 ClickHouse-Client 命令wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-20.7.2.30-2.noarch.rpmwget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-20.7.2.30-2.x86_64.rpm
# 安装客户端rpm -ivh *.rpm
# 使用 tcp 端口登陆 ClickHouse 集群,IP 地址可通过控制台查看clickhouse-client -hxxx.xxx.xxx.xxx --port 9000


-- 创建数据库CREATE DATABASE IF NOT EXISTS testdb ON CLUSTER default_cluster;
-- 创建表CREATE TABLE testdb.model_predict_result_1 on cluster default_cluster (res String,Sign Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/model_predict_result_1', '{replica}',Sign) ORDER BY res;



(六)注册开通TI-ONE服务


智能钛机器学习平台是为AI工程师打造的一站式机器学习服务平台,为用户提供从数据预处理、模型构建、模型训练、模型评估到模型服务的全流程开发及部署支持。


进入TI-ONE控制台[8],在弹出的页面上开通【角色授权】。


  • 单击【前往访问管理】,页面将跳转至访问管理控制台。


  • 单击【同意授权】,即可创建服务预设角色并授予智能钛机器学习平台相关权限。



角色授权开通后,返回TI-ONE控制台[8],开通所需地区的后付费计费模式。 具体步骤可参考TI-ONE的官方文档注册与开通服务[9]


(七)注册开通TI-EMS服务


智能钛弹性模型服务(Tencent Intelligence Elastic Model Service,TI-EMS)是具备虚拟化异构算力和弹性扩缩容能力的无服务器化在线推理平台。


角色授权:进入TI-EMS控制台[10],参考上面步骤进行【角色授权】   

创建专用资源组:TI-EMS平台目前提供公共资源组和专用资源组两种模式,关于两种模式的优缺点可参见官网文档资源组管理[11]。本例子通过流计算Oceanus调用TI-EMS服务,需打通相对应的VPC,因此需选用专用资源组。关于专用资源组的开通方式可参见官网文档资源组管理[11]


三、方案实现


本文通过TI-ONE平台,利用决策树算法搭建乳腺癌预测模型(决策树分类模型),将模型结果保存在COS上 (用户也可以自己在本地训练完成后将训练好的模型文件保存在本地或者COS,之后通过TI-EMS创建模型服务配置即可调用)。然后由流计算Oceanus模拟生成实时特征数据,以CSV格式存储在 CKafka,再通过流计算Oceanus取CKafka的特征数据作为入参,结合TI-EMS进行乳腺癌模型的实时调用,预测结果保存在ClickHouse中。

(一)离线模型训练


  • 数据集介绍


本次任务我们采用公开的乳腺癌数据集[12],该数据集共包含569个样本,其中357个阳性(y=1)样本,212个阴性(y=0)样本;每个样本有32个特征,但本次实验中选取其中10个特征。数据信息及模型训练流程请参考TI-ONE最佳实践乳腺癌预测[13]

数据集具体字段信息如下:


数据集具体内容抽样展示如下(前9列:特征,第10列:标签):


  • 离线模型训练



模型训练:进入TI-ONE控制台[8],点击左侧【工程列表】,单击【新建工程】,【COS Bucket】选择之前创建好的COS。进入【工作流编辑页面】,按需拖拽对应的输入、算法、输出等模块到右侧页面即可快速构建一个完整的模型训练框架,具体构建方法可参考官网文档使用可视化建模构建模[14]。 当然,用户也可以自行编写代码上传到【Notebook】页面进行模型训练,具体请参考官网使用Notebook构建模型[15],另外也可以使用TI SDK构建模型[16]



模型效果:运行成功后,右键单击【二分类任务评估】>【评估指标】,即可查看模型效果。


模型保存:右键单击模型文件(【决策树分类】左侧小圆圈),点击【模型操作】>【保存到模型仓库】,保存成功后返回【模型仓库】页面,查看保存的模型服务。



(二)实时特征工程


本示例基于流计算Oceanus SQL作业生成,使用Datagen连接器模拟生成实时特征数据,并将结果以CSV格式存储在CKafka中,供之后进行模型调用。用户可以根据实际业务情况自行选择SQL、ETL、JAR作业方式进行实时特征数据的输出。

  • 创建Source


-- random source 用于模拟患者病历实时特征数据
CREATE TABLE random_source ( ClumpThickness INT, UniformityOfCellSize INT, UniformityOfCellShape INT, MarginalAdhsion INT, SingleEpithelialCellSize INT, BareNuclei INT, BlandChromation INT, NormalNucleoli INT, Mitoses INT ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', -- 每秒产生的数据条数 'fields.ClumpThickness.kind'='random', -- 无界的随机数 'fields.ClumpThickness.min'='0', -- 随机数的最小值 'fields.ClumpThickness.max'='10', -- 随机数的最大值 'fields.UniformityOfCellSize.kind'='random', -- 无界的随机数 'fields.UniformityOfCellSize.min'='0', -- 随机数的最小值 'fields.UniformityOfCellSize.max'='10', -- 随机数的最大值 'fields.UniformityOfCellShape.kind'='random', -- 无界的随机数 'fields.UniformityOfCellShape.min'='0', -- 随机数的最小值 'fields.UniformityOfCellShape.max'='10', -- 随机数的最大值 'fields.MarginalAdhsion.kind'='random', -- 无界的随机数 'fields.MarginalAdhsion.min'='0', -- 随机数的最小值 'fields.MarginalAdhsion.max'='10', -- 随机数的最大值 'fields.SingleEpithelialCellSize.kind'='random', -- 无界的随机数 'fields.SingleEpithelialCellSize.min'='0', -- 随机数的最小值 'fields.SingleEpithelialCellSize.max'='10', -- 随机数的最大值 'fields.BareNuclei.kind'='random', -- 无界的随机数 'fields.BareNuclei.min'='0', -- 随机数的最小值 'fields.BareNuclei.max'='10', -- 随机数的最大值 'fields.BlandChromation.kind'='random', -- 无界的随机数 'fields.BlandChromation.min'='0', -- 随机数的最小值 'fields.BlandChromation.max'='10', -- 随机数的最大值 'fields.NormalNucleoli.kind'='random', -- 无界的随机数 'fields.NormalNucleoli.min'='0', -- 随机数的最小值 'fields.NormalNucleoli.max'='10', -- 随机数的最大值 'fields.Mitoses.kind'='random', -- 无界的随机数 'fields.Mitoses.min'='0', -- 随机数的最小值 'fields.Mitoses.max'='10' -- 随机数的最大值);

  • 创建Sink


 CREATE TABLE `KafkaSink` (    ClumpThickness             INT,    UniformityOfCellSize       INT,    UniformityOfCellShape      INT,    MarginalAdhsion            INT,    SingleEpithelialCellSize   INT,    BareNuclei                 INT,    BlandChromation            INT,    NormalNucleoli             INT,    Mitoses                    INT ) WITH (     'connector' = 'kafka',                                  -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置  Connector     'topic' = 'topic-decision-tree-predict-1',              -- 替换为您要消费的 Topic     'properties.bootstrap.servers' = '172.28.28.211:9092',  -- 替换为您的 Kafka 连接地址     'properties.group.id' = 'RealTimeFeatures',             -- 必选参数, 一定要指定 Group ID     'format' = 'csv' );

  • 编写业务SQL


 INSERT INTO `KafkaSink` SELECT * FROM `random_source`

  • 选择Connector


点击【作业参数】,在【内置Connector】选择flink-connector-kafka,点击【保存】>【发布草稿】运行作业。

  • 查询数据


进入CKafka控制台[3],选择相应的CKafka示例进入,单击【topic管理】,选择对应的topic,查询写入数据。




(三)实时预测


本示例基于流计算Oceanus JAR作业方式演示,首先将存储在CKafka的特征数据提取出来,经过简单的数据格式转换发送到TI-EMS服务进行模型调用,并将返回结果存储在ClickHouse中。本示例使用单一的在线推理服务,用户可根据自己实际需求做负载均衡。

  • 启动模型服务


进入TI-ONE控制台[8],点击左侧【模型仓库】,选择对应的模型服务单击【启动模型服务】,【资源组】选择之前创建好的专用资源组。创建成功之后返回TI-EMS控制台[10],在左侧的【模型服务】>【在线推理】页面查看所创建的模型服务。


  • 公网调用模型测试


  • 单击右侧【更多】>【调用】,创建公网调用地址。



  • 启动控制台,新建data.json文件,在某一文件夹下运行如下代码:


# 请将 <访问地址>/<密钥> 替换为实际的 IP 地址/密钥curl -H "Content-Type: application/json" \-H "x-Auth-Token: <密钥>" \-X POST <访问地址>/v1/models/m:predict -d @data.json

data.json数据格式如下:

{"instances" : [{"_c0": 3, "_c1": 2, "_c2": 3, "_c3": 0, "_c4": 0, "_c5": 2, "_c6": 1, "_c7": 0, "_c8": 1}]}

模型调用返回结果如下:

{"predictions": [{"pmml(prediction)":"1","probability(0)":"0.47058823529411764","probability(1)":"0.5294117647058824","prediction":"1.0","label":"1"}]}

  • 通过流计算Oceanus调用模型服务


除了可以使用公网调用模型外,还可以使用VPC方式调用模型。本小节着重介绍如何使用流计算Oceanus JAR作业的方式调用模型进行实时预测。


  • 本地代码开发、调试。


  • 进入流计算Oceanus控制台[2],单击左侧【依赖管理】新建依赖并上传JAR包。 


  • 进入【作业管理】页面,创建JAR作业,选择之前创建好的流计算Oceanus集群。 


  • 单击【开发调试】指定相应的主程序包和主类,点击【作业调试】,【内置Connector】选择flink-connector-clickhouse和flink-connector-kafka。


ClickHouse数据查询


Java代码如下

import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.table.api.Table;import org.apache.flink.util.Collector;import org.apache.http.HttpEntity;import org.apache.http.HttpResponse;import org.apache.http.client.HttpClient;import org.apache.http.client.methods.HttpPost;import org.apache.http.entity.StringEntity;import org.apache.http.impl.client.HttpClientBuilder;import org.apache.http.util.EntityUtils;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.json.JSONObject;import org.slf4j.LoggerFactory;import org.slf4j.Logger;import java.util.ArrayList;import java.util.Properties;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class OnlinePredict {
public static final Logger logger = LoggerFactory.getLogger(OnlinePredict.class);
public static void main(String[] args) throws Exception { // kafka配置参数解析 final ParameterTool parameterTool = ParameterTool .fromPropertiesFile(OnlinePredict.class.getResourceAsStream("/KafkaSource.properties")); // 实例化运行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
// checkpoint配置 streamEnv.enableCheckpointing(parameterTool.getLong("flink.stream.checkpoint.interval", 30_000)); streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 重启策略 streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10_000)); // source、transfer、sink DataStream stringResult = streamEnv.addSource(buildKafkaSource(parameterTool)) .flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { String paramInput = inputDataTransfer(value); String outputData = sendHttpData(paramInput); out.collect(outputData); } });
Table tableResult = tableEnv.fromDataStream(stringResult); tableEnv.createTemporaryView("resultSink",tableResult);
tableEnv.executeSql("CREATE TABLE `CKSink` (\n" + " res STRING,\n" + " PRIMARY KEY (`res`) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'clickhouse',\n" + " 'url' = 'clickhouse://172.28.1.138:8123',\n" + " 'database-name' = 'testdb',\n" + " 'table-name' = 'model_predict_result_1',\n" + " 'table.collapsing.field' = 'Sign'\n" + ")");
tableEnv.executeSql("insert into CKSink select * from resultSink");
}
// kafka source public static SourceFunction buildKafkaSource(ParameterTool parameterTool) throws Exception { Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, parameterTool.get("kafka.source.bootstrap.servers")); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, parameterTool.get("kafka.source.auto.offset.reset", "latest")); properties.put(ConsumerConfig.GROUP_ID_CONFIG, parameterTool.get("kafka.source.group.id"));
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer( parameterTool.get("kafka.source.topic"), new SimpleStringSchema(), properties); consumer.setStartFromGroupOffsets();
return consumer; }
// kafka 数据格式转换 // 返回数据格式:{"instances" : [{"_c0": 3, "_c1": 2, "_c2": 3, "_c3": 0, "_c4": 0, "_c5": 2, "_c6": 1, "_c7": 0, "_c8": 1}]} public static String inputDataTransfer(String value) { String[] input = value.split(","); ArrayList dataListMap = new ArrayList(); JSONObject jsondata = new JSONObject(); for (int i = 0; i < input.length; i++) { jsondata.put("_c" + i, Double.parseDouble(input[i])); } dataListMap.add(jsondata); String param = "{\"instances\":" + dataListMap.toString() + "}"; return param; }
// TI-EMS 模型在线推理服务调用 // 返回数据格式如下:{"predictions": [{"pmml(prediction)":"1","probability(0)":"0.47058823529411764","probability(1)":"0.5294117647058824","prediction":"1.0","label":"1"}]} public static String sendHttpData(String paramJson) throws Exception { String data = null; try { // 请将 xx.xx.xx.xx:xxxx 替换为实际的 IP 地址,参考 3.2.2 图中所示 创建 VPC 调用 String url = "http://xx.xx.xx.xx:xxxx/v1/models/m:predict"; HttpClient client = HttpClientBuilder.create().build(); HttpPost post = new HttpPost(url);
post.addHeader("Content-type", "application/json"); post.addHeader("Accept", "application/json"); // 请将 xxxxxxxxxx 替换为实际密钥,参考 3.2.2 图中所示 创建 VPC 调用 post.addHeader("X-AUTH-TOKEN", "xxxxxxxxxx");
StringEntity entity = new StringEntity(paramJson, java.nio.charset.Charset.forName("UTF-8")); post.setEntity(entity); HttpResponse response = client.execute(post);
// 判断是否正常返回 if (response.getStatusLine().getStatusCode() == 200) { // 解析数据 HttpEntity resEntity = response.getEntity(); data = EntityUtils.toString(resEntity); } else { data = "error input"; } System.out.print(data); System.out.println(data); } catch (Throwable e) { logger.error("", e); } return data; }
}

Kafka Source参数配置

# source // 请替换为实际的参数kafka.source.bootstrap.servers=172.28.28.211:9092kafka.source.topic=topic-decision-tree-predict-1kafka.source.group.id=RealTimePredict1kafka.source.auto.offset.reset=latest

POM依赖

<properties>    <flink.version>1.11.0flink.version>properties>
<dependencies>
<dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-streaming-java_2.11artifactId> <version>${flink.version}version> <scope>providedscope> dependency>
<dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-connector-kafka_2.11artifactId> <version>${flink.version}version> <scope>providedscope> dependency>
<dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-connector-clickhouseartifactId> <version>${flink.version}version> <scope>providedscope> dependency>
<dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-table-commonartifactId> <version>${flink.version}version> <scope>providedscope> dependency>
<dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-table-api-java-bridge_2.11artifactId> <version>${flink.version}version> <scope>providedscope> dependency>
<dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-table-api-javaartifactId> <version>${flink.version}version> <scope>providedscope> dependency>
<dependency> <groupId>org.apache.httpcomponentsgroupId> <artifactId>httpclientartifactId> <version>4.5.3version> <scope>compilescope> dependency>
<dependency> <groupId>org.jsongroupId> <artifactId>jsonartifactId> <version>20201115version> <scope>compilescope> dependency>
dependencies>


四、总结


  • 新版Flink 1.13集群无需用户自己选择内置Connector,平台将自动匹配。


  • 除了使用CKafka及ClickHouse作为数据仓库外,还可以使用Hive、Mysql、PG等作为数仓,根据用户实际需求自行选择。


  • 本方案最简化了实时特征工程,用户可以根据自身业务需求采用SQL、JAR、ETL作业的方式完成实时特征工程。


  • 本方案只初始化了一个PMML服务提供流计算Oceanus调用,如遇数据背压情况可增多PMML服务循环调用。


  • TI-ONE、TI-EMS平台暂时不支持实时训练模型,如需更新模型可以自行编写定时脚本拉取数据在TI-ONE平台训练更新。



五、参考地址


[1] VPC帮助文档:https://cloud.tencent.com/document/product/215/36515
[2] Oceanus控制台:https://console.cloud.tencent.com/oceanus/job  
[3] CKafka控制台:https://console.cloud.tencent.com/ckafka/overview  
[4] COS控制台:https://console.cloud.tencent.com/cos5  
[5] COS控制台快速入门:https://cloud.tencent.com/document/product/436/38484  
[6] ClickHouse控制台:https://console.cloud.tencent.com/cdwch  
[7] ClickHouse快速入门:https://cloud.tencent.com/document/product/1299/49824  
[8] TI-ONE控制台:https://console.cloud.tencent.com/tione  
[9] 注册与开通TI-ONE服务:https://cloud.tencent.com/document/product/851/39086
[10] TI-EMS控制台:https://console.cloud.tencent.com/tiems/overview  
[11] TI-EMS资源组管理:https://cloud.tencent.com/document/product/1120/38968  
[12] 乳腺癌数据集:https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/  
[13] 乳腺癌预测:https://cloud.tencent.com/document/product/851/35127  
[14] 使用可视化建模构建模型:https://cloud.tencent.com/document/product/851/44432  
[15] 使用Notebook构建模型:https://cloud.tencent.com/document/product/851/44434 
[16] 使用TI SDK构建模型:https://cloud.tencent.com/document/product/851/44435  

流计算Oceanus限量秒杀专享活动火爆进行中↓↓




👇点击「阅读原文」了解腾讯云流计算Oceanus更多信息~
浏览 38
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报