关于MQTT协议
最近一段时间乱糟糟的,发生了好多事情,公众号也暂停了一段时间,不过嘛,状态总要调整过来的,人嘛,每个月都有那么几天嘛,所以加油~
今天我们来讲讲MQTT,其实对于产品而言,严格来讲只要懂得应用场景就可以了,但是我依然愿意更深度的去学习和分析这些更偏技术侧的东西,原因也很简单,不管是MQTT、HTTP还是其他的什么,这些存在了很久且成熟度很高的技术其实是非常值得借鉴的,他的结构分层,他的运作逻辑,他的异常处理机制都能给我们在产品设计方面的启发,另外清晰的掌握产品和技术的边界也是很重要的,我们要明白哪些是可以用技术手段解决的,哪些是用产品手段解决的,这对于做纯软件的产品经理似乎显得不是那么重要,但是对于物联网或者人工智能的产品而言,就很重要了,好了我们开始吧。
一、什么是MQTT
1、MQTT
MQTT协议(Message Queue Telemetry Transport,消息队列遥测传输协议)是IBM于1999年为了一个通过卫星网络连接输油管道的项目开发的。MQTT具备实现简单、支持三种Qos级别、轻量、占带宽低、可保持持续回话、可传输任意类型数据等特点,这让它非常适合计算能力有限、网络带宽低、信号不稳定的远程设备,所以它成为了物联网系统事实上的网络协议标准。
2、MQTT的特性
简单来说,MQTT具备以下的特性:
基于TCP协议的应用层协议; (MQTT协议是运行在应用层的协议,底层使用TCP协议进行数据传输,整个协议栈运行在IP网络上,这一点不同于我们经常提到的LoRaWAN,他们是在物理层/链路层的协议,他们解决的是设备如何接入互联网的问题。)
采用C/S架构;
使用订阅/发布模式;(采用订阅/发布模式将消息的发送方和接受方解耦,这种模式更好的契合了物联网的场景,具备更好的可扩展性)
提供3种消息的QoS (3种Qos是指三种安全模式,保证只多一次、最少一次、只有一次,可兼容不同场景的需求。)
收发消息都是异步的,发送方不需要等待接收方应答。
二、MQTT协议详解
1、MQTT协议的通信模型
我们先来看看MQTT协议的通信模型:
可以看到在MQTT协议中,发布方和订阅房并没有直接连接,而是通过了中间的角色进行了一次中转,在MQTT协议中,我们通常将中间的中转角色成为Broker,发送消息和接收消息的订阅方成为Client。
我们一起来看看信息交互的流程:
(1)发布方和订阅方都建立了到Broker的TCP连接;
(2)订阅方告知Broker他要订阅的主题,主题我们成为Topic;
(3)发布方将消息发送到Broker,并指定相应的Topic;
(4)Broker接收到消息以后,检查都有哪些订阅方订阅了这个Topic,然后将消息发送到这些订阅方;
(5)订阅方从Broker获取到这些信息;
(6)如果订阅方此时与Broker处于中断中泰,Broker也可以先保存这条消息,当订阅方在线时,将消息发送给订阅方。
2、MQTT Client
任何终端,无论是嵌入式设备,还是服务器,只要运行了MQTT协议的库或者代码并连接了MQTT Broker,我们都称其为MQTT Client。Publisher(发布方)和Subscriber(订阅方)都属于Client。一个Client是Pushlisher还是Subscriber,只取决于该Client当前的状态——是在发布消息还是在订阅消息。当然,一个Client可以同时是Publisher和Subscriber。
在大多数情况下,我们不需要自己按照MQTT的协议规范来实现一个MQTT Client,因为MQTT Client库在很多语言中都有实现,包括Android、Arduino、Ruby、C、C++、C#、Go、iOS、Java、JavaScript以及.NET等。
3、MQTT Broker
搭建一个完整的MQTT协议环境,除了需要MQTT Client外,我们还需要一个MQTT Broker。如前文所述,Broker负责接收Publisher的消息,并将消息发送给相应的Subscriber,它是整个MQTT协议订阅/发布的核心。在实际应用中,一个MQTT Broker还应该提供如下功能:
可以对Client的接入进行授权,并对Client进行权限控制;
可以横向扩展,比如集群,满足海量的Client接入;
有较好的扩展性,可以比较方便地接入现有业务系统;
易于监控,满足高可用性。
4、Client与Broker的连接
4.1 建立连接
Client在可以发布和订阅消息之前,必须先连接到Broker,看看这个流程图:
(1)Client向Broker发送一个CONNECT数据包;
(2)Broker在收到Client的CONNECT数据包后,如果允许Client接入,则回复一个CONNACK包,该CONNACK包的返回码为0,表示MQTT协议连接建立成功;如果不允许Client接入,也回复一个CONNACK包,该CONNACK包的返回码为一个非0的值,用来标识接入失败的原因,然后断开底层的TCP连接。
4.2 关闭连接
MQTT协议的连接关闭可以由Client或Broker两者任意一方发起:Cliennt主动关闭连接:Client主动关闭连接的流程非常简单,只需要向Broker发送一个DISCONNECT数据包就可以了。如果Client没有发送DISCONNECT就断开了连接,那么Broker会认为Client是异常中断的,此时就会向在连接的时候指定的遗愿主题发布遗愿消息。
Broker主动关闭连接:MQTT协议规定Broker在没有收到Client的DISCONNECT数据包之前都应该保持和Client的连接,只有Broker在Keepalive的时间间隔里,没有收到Client的任何MQTT协议数据包时才会主动关闭连接。一些Broker的实现在MQTT协议上做了一些拓展,支持Client的连接管理,可以主动断开和某个Client的连接。Broker主动关闭连接之前不需要向Client发送任何MQTT协议数据包,直接关闭底层的TCP连接就可以。
5、订阅与发布
5.1 订阅一个主题
ClientB想要接收ClientA发布到某个主题的消息,就必须先向Broker订阅这个主题,订阅一个主题的流程如图:
(1)Client向Broker发送一个SUBSCRIBE数据包,其中包含Client想要订阅的主题以及其他参数。
(2)Broker收到SUBSCRIBE数据包后,向Client发送一个SUBACK数据包作为应答。
具体的数据内容我们就不详细看了,对产品经理而言,知道这些足够了。
5.2 取消订阅
Subscriber也可以取消对某些主题的订阅。取消订阅的流程如图:
(1)Client向Broker发送一个UNSUBSCRIBE数据包,其中包含Client想要取消订阅的主题。
(2)Broker收到UNSUBSCRIBE数据包后,向Client发送一个UNSUBACK数据包作为应答。
6.MQTT中的Qos
作为最初用来在网络带宽窄、信号不稳定的环境下传输数据的协议,MQTT协议设计了一套保证消息稳定传输的机制,包括消息应答、存储和重传。在这套机制下,MQTT协议还提供了3种不同层次的QoS。
6.1 Qos0
At most once,至多一次,表示发送方发送一条消息,接收方最多能收到一次,也就是说Sender尽力向接收方发送消息,如果发送失败,则放弃。
Qos0所消耗的资源最少,但是可能存在丢包,我们思考一个问题,对于一个上报数据很频繁的传感器,即使丢掉一两条数据,问题也不大,这种场景下Qos0很显然是适用的。
6.2 Qos1
At least once,至少一次,表示发送方发送一条消息,接收方至少能收到一次,也就是说发送方向接收方发送消息,如果发送失败,发送方会继续重试,直到接收方收到消息为止,但是因为重传的原因,接收方可能会收到重复的消息。
流程是这样的:
(1)发送方向接收方发送一个带有消息数据的PUBLISH数据包,并在本地保存这个PUBLISH数据包。
(2)接收方在收到PUBLISH数据包后,向发送方发送一个PUBACK数据包,PUBACK数据包没有消息体,只在可变头中有一个包标识(Packet Identifier),和它收到的PUBLISH数据包中的Packet Identifier一致。
(3)发送方在收到PUBACK数据包之后,根据PUBACK数据包中的Packet Identifier找到本地保存的PUBLISH数据包,然后丢弃,这样一次消息发送完成。
(4)如果发送方在一段时间内没有收到PUBLISH数据包对应的PUBACK数据包,那么它会将PUBLISH数据包中的DUP标识设为1(代表的是重新发送PUBLISH数据包),然后重新发送该PUBLISH数据包。重复这个流程,直到发送方收到PUBACK数据包,然后执行第3步。
需要注意的是需要对数据进行去重,去重方式也很简单,只要在消息内包含消息的ID就行,通常用的是UUID。
6.3 Qos2
Exactly once,确保只有一次,表示发送方发送一条消息,接收方确保能收到且只收到一次,也就是说发送方尽力向接收方发送消息,如果发送失败,会继续重试,直到接收方收到消息为止,同时保证接收方不会因为消息重传而收到重复的消息。
QoS2使用2套请求/应答流程(一个4段的握手)来确保接收方收到来自发送方的消息,且不重复。
(1)发送方发送QoS值为2的PUBLISH数据包,假设该数据包中Packet Identifier为P,并在本地保存该PUBLISH数据包。
(2)接收方收到PUBLISH数据包后,在本地保存PUBLISH数据包的Packet Identifier为P,并回复发送方一个PUBREC数据包。PUBREC数据包可变头中的Packet Identifier为P,没有消息体。
(3)当发送方收到PUBREC数据包后,它就可以安全地丢掉初始的Packet Identifier为P的PUBLISH数据包,同时保存该PUBREC数据包,并回复接收方一个PUBREL数据包。PUBREL数据包可变头中的Packet Identifier为P,没有消息体;如果发送方在一定时间内没有收到PUBREC数据包,它会把PUBLISH数据包的DUP标识设为1,重新发送该PUBLISH数据包。
(4)当接收方收到PUBREL数据包时,它可以丢弃掉保存的PUBLISH数据包的Packet Identifier P,并回复发送方一个PUBCOMP数据包。PUBCOMP数据包可变头中的Packet Identifier为P,没有消息体。
(5)当发送方收到PUBCOMP数据包,它会认为数据包传输已完成,会丢掉对应的PUBREC数据包。如果发送方在一定时间内没有收到PUBCOMP数据包,则会重新发送PUBREL数据包。
可以看到,Qos2也是最安全,但是效率最低的一种Qos等级。
7.Retained
关于Retained我们可以这样来理解,想象一个场景,你有一个温度传感器,它每3个小时向一个主题发布当前温度。那么问题来了,有一个新的订阅者在它刚刚发布了当前温度之后订阅了这个主题,那么这个订阅端什么时候才能收到温度消息?没错,和你想的一样,它必须等到3个小时以后,温度传感器再次发布消息的时候才能收到。在这之前,这个新的订阅者对传感器的温度数据一无所知。这时候我们就可以用到Retained消息来解决这个问题。
Retained消息是指在PUBLISH数据包中将Retain标识设为1的消息,Broker收到这样的PUBLISH数据包以后,将为该主题保存这个消息,当一个新的订阅者订阅该主题时,Broker会马上将这个消息发送给订阅者。Retain消息有如下特点:
一个Topic只能有一条Retained消息,发布新的Retained消息将覆盖旧的Retained消息;
如果订阅者使用通配符订阅主题,那他会收到所有匹配主题的Retained消息;
只有新的订阅者才会收到Retained消息,如果订阅者重复订阅一个主题,那么在每次订阅的时候都会被当作新的订阅者,然后收到Retained消息。
8.LWT
LWT全称为Last Will and Testament,也就是我们在连接Broker时提到的遗愿,包括遗愿主题、遗愿QoS、遗愿消息等。顾名思义,当Broker检测到Client非正常地断开连接时,就会向Client的遗愿主题中发布一条消息。遗愿的相关设置是在建立连接时,在CONNECT数据包里面指定的。LWT配合Retained可以用于监测设备的连接状态:
(1)Client在连接时指定Will Topic为“client/status”,遗愿消息为“offline”,Will Retain=1;
(2)Client在连接成功后向同一个主题“client/status”发布一个内容为“online”的Retained消息。那么,订阅者在任何时候订阅“client/status”,都会获取Client当前的连接状态。
9.Keepalive
在生产环境下,特别是物联网这种无人值守的设备比较多的情况下,我们都希望设备能够自动从错误中恢复过来,比如在网络故障恢复以后,设备能够自动重新连接Broker。而Keepalive的存在就是为了实时监测设备的连接状态,以及时的完成保活。在建立起连接的时候我们会传递一个Keepalive的参数,单位为秒。MQTT协议规定,在1.5倍Keeplive的时间间隔内,如果Broker没有收到来自Cilent的任何数据包,那么就认为已经断开了。但是我们不能保证每一个1.5倍Keepalive中都刚好有数据传输,因此MQTT协议中设计了PINGREQ/PINGRESP数据包,在Cilent与Broker之间没有数据传输的时候,可以通过该机制传输非常小的数据内容来满足Keepalive的约定。值得注意的是,Keepalive还有这样的特性:
(1)如果在一个Keepalive时间间隔内,Client和Broker有过数据包传输,比如PUBLISH数据包,那Client就没有必要再使用PINGREQ数据包了,在网络资源比较紧张的情况下这点很重要;
(2)Keepalive的值是由Client指定的,不同的Client可以指定不同的值;
(3)Keepalive的最大值为18个小时12分15秒;
(4)Keepalive的值如果设为0的话,代表不使用Keepalive机制。