实践教程丨如何在边缘基础设施上运行故障预测解决方案?

共 4883字,需浏览 10分钟

 ·

2020-12-12 09:15

    作者简介

Janakiram MSV是Janakiram & Associates的首席分析师,也是国际信息技术学院的兼职教师。他也是Google Qualified Developer、亚马逊认证解决方案架构师、亚马逊认证开发者、亚马逊认证SysOps管理员和微软认证Azure专业人员。


Janakiram是云原生计算基金会(CNCF)的大使,也是首批Kubernetes认证管理员和Kubernetes认证应用开发者之一。他曾在微软、AWS、Gigaom Research等知名公司工作。


之前的文章中,我讨论了Rancher的轻量级Kubernetes发行版K3s、Calico和Portworx如何成为运行在边缘的现代AI和物联网系统的基础。在本文中我们将设计和部署一个解决方案以运行在这一基础设施上。


我们将基于监控涡轮机的风扇的假设场景来构建一个预测性维护解决方案,以检测风扇的异常情况。该架构利用了各种开源的云原生技术,可以作为设计和构建IoT/边缘解决方案的参考架构。


问题陈述


我们将设计和部署一个解决方案,该解决方案可以从多个风扇摄取遥测数据,并在故障发生之前使用实时流(real-time stream)来预测故障。该解决方案运行在低端机器(如英特尔NUC)的边缘基础设施上。在本文中,我们将使用前文搭建的基础设施(基于K3s、Calico以及Portwox),它们提供了Kubernetes集群的核心组件。


解决方案架构


连接到风扇的传感器提供了当前的转速、振动、温度和噪音水平等数据。这些遥测数据流和每个风扇的设备ID一起作为预测性维护解决方案的输入。



Mosquitto是一款使用广泛的开源MQTT broker,它将作为传感器的网关以及平台的集中式消息broker。传感器将遥测数据摄入Mosquitto broker的fan/messages类别下方。


以下是每个风扇发布到MQTT主题的有效载荷:



预测器微服务和风扇发布的数据在同一个遥测频道,它会从中读取数据。对于每个入站数据点,它都会调用异常检测服务,并将结果发布到一个单独的MQTT主题中,即fan/anomaly


import timeimport requestsimport randomimport datetimeimport jsonimport osimport paho.mqtt.client as mqtt broker_address = os.getenv('MQTT_HOST')dev_topic = os.getenv('MQTT_DEV_TOPIC')pred_topic = os.getenv('MQTT_PREDICT_TOPIC')scoring_url=os.getenv('SCORING_URL')d={} client = mqtt.Client("pdm")client.connect(broker_address) def on_message(mosq, obj, msg):    rotation=json.loads(msg.payload)["rotation"]    temperature=json.loads(msg.payload)["temperature"]    vibration=json.loads(msg.payload)["vibration"]    sound=json.loads(msg.payload)["sound"]    telemetry=[rotation,temperature,vibration,sound]    data={"params":telemetry}        response = requests.post(scoring_url, json=data)    fault=json.loads(response.text)["fault"]     d["deviceID"]=json.loads(msg.payload)["deviceID"]    d["fault"]=fault     payload = json.dumps(d, ensure_ascii=False)    print(payload)    client.publish(pred_topic,payload)    def on_subscribe(mosq, obj, mid, granted_qos):    print("Subscribed: " + str(mid) + " " + str(granted_qos)) client.on_message = on_messageclient.on_subscribe = on_subscribeclient.connect(broker_address)client.subscribe(dev_topic, 0) while True:    client.loop()


SCORING_URL是异常检测推理服务的一个端点。通过Flask网络服务暴露了一个在TensorFlow中训练的深度学习模型。


下面是预测服务发布到MQTT主题的有效载荷:



训练异常检测模型


用一个超过20000个数据点的历史数据集来训练异常检测模型。



从数据集中观察到,在故障发生前的几个小时,风扇的转速会降低并伴随着振动、声音、温度值的增加。


转速数据的散点图直观地显示了这一点。风扇的转速从正常的平均600转下降到400转。



基于此,我们可以轻松地训练一个简单的TensorFlow逻辑回归模型来预测故障风扇。我们先去掉时间戳和设备ID列。


dataframe = pandas.read_csv("../data/fan.csv", header=None,skiprows=1)del dataframe[0]del dataframe[1]


分离特征和标签后,再将数据集分为训练数据和测试数据。


dataset = dataframe.valuesX = dataset[:,0:4].astype(float)y = dataset[:,4]X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)


然后我们创建一个4层神经网络,做逻辑回归。


model = Sequential()model.add(Dense(60, input_dim=4, activation='relu'))model.add(Dense(30, activation='relu'))model.add(Dense(10, activation='relu'))model.add(Dense(1, activation='sigmoid'))model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])model.fit(X_train, y_train, epochs=250, batch_size=32, verbose=0)


最后,该模型被保存和评估。


model.save("../model")loss, acc = model.evaluate(X_test, y_test, verbose=0)print('Test Accuracy: %.3f' % acc)


保存到磁盘上的TensorFlow模型被推理服务加载,然后对预测器微服务发送的数据进行预测。


时间序列数据和可视化


InfluxDB的实例通过Telegraf与Mosquitto连接。这种配置为我们提供了一个优雅的机制,可以在不写代码的情况下将时间序列数据摄入InfluxDB。


下面是连接Mosquitto和InfluxDB的Telegraf配置:


  [agent]      interval = "10s"      round_interval = true      metric_batch_size = 1000      metric_buffer_limit = 10000      collection_jitter = "0s"      flush_jitter = "0s"      debug = false      quiet = false      hostname = ""      omit_hostname = true     [[outputs.influxdb]]       urls = ["http://influxdb:8086"]      database = "fan"      retention_policy = "autogen"      precision = "s"      timeout = "5s"     [[outputs.file]]    files = ["stdout"]    data_format = "influx"     [[inputs.mqtt_consumer]]      servers = ["tcp://mosquitto:1883"]      qos = 0       topics = [        "fan/#"      ]       insecure_skip_verify = true      client_id = ""      data_format = "json"      name_override = "fan"      tag_keys = ["deviceID"]      json_string_fields = ["rotation","temperature","vibration","sound","fault"]


现在可以从InfluxDB查询时间序列数据。



最后,我们将Grafana仪表盘连接到InfluxDB,为我们的AIoT解决方案构建一个直观的可视化面板。



在本教程的下一部分,我将讨论部署架构以及基于K3s、Calico和Portworx的存储和网络。保持关注哟!



推荐阅读

1款开源工具,实现自动化升级K3S集群!

K3s+Sysdig,8分钟部署并保护集群安全!

领跑开源创新,K3s入选InfoQ 2020 年度十大开源新锐项目



About k3s


k3s 是首个进入 CNCF 沙箱项目的 K8S 发行版,同时也是当前全球用户量最大的 CNCF 认证轻量级 K8S 发行版。自2019年3月发布以来,备受全球开发者们关注,至今GitHub Star数已超过 14,700,成为了开源社区最受欢迎的边缘计算 K8S 解决方案。截至目前,K3s全球下载量超过100万次,每周平均被安装超过2万次,其中30%的下载量来自中国。


k3s 专为在资源有限的环境中运行 Kubernetes 的研发和运维人员设计,将满足日益增长的在边缘计算环境中运行在 x86、ARM64 和 ARMv7 处理器上的小型、易于管理的 Kubernetes 集群需求。k3s 的发布,为开发者们提供了以“Rancher 2.X + k3s”为核心的从数据中心到云到边到端的 K8S 即服务(Kubernetes-as-a-Service),推动 Kubernetes Everywhere。

扫码添加k3s中文社区助手

加入官方中文技术社区

官网:https://k3s.io

浏览 25
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报