MQTT笔记
Overview
写在最前面
最初在思考如何监控网关的连接状态,loraWAN的网关可以承载很多个节点,少则数十,多则上千,如果网关掉线了怎么办?节点就全挂了。
关于设备离线监控,原本想使用定时上报的策略来实现,但觉得既然MQTT定义了连接保活策略,那么是否可以通过监控设备连接来更新网关网关状态?可以的,只是实现的方式有点奇怪。
首先为每个网关设定一个clientID,直接按照loraWAN的标准,取网卡MAC地址,拓展到48位,避免系统使用UUID自动分配,然后使用ElasticSearch收集容器日志,监控Broker状态,每分钟轮询,当监控到匹配的字符串时,获取对应网关ID,更新连接状态,推送消息。
在更深入一点MQTT协议后,又发现其实可以利用MQTT的遗嘱机制,Client启用遗嘱,在连接到Broker时,携带指定的topic与payload,当Client异常断开时(没有使用Disconnect断开连接),Broker会发布遗嘱topic与payload,所以更好一点的机制是:上线时主动发布上线topic,掉线时利用遗嘱机制发布异常下线topic。
实际测试中,前者根据日志查询,实时性比后者高,任何导致重连的因素,如网络不稳定,重连后销毁旧连接,创建新连接等都会记录下来,产生较多的日志和推送,而后者只有在直接拔掉网关的网线或者掉电关机的情况,服务端检测到KeepAlive超时,发布一个异常下线的topic。如果需要考察网络连接的稳定性,可以直接监控Broker的日志,其他的情况使用遗嘱机制足以应对。
在了解MQTT的连接策略过程中看了很多相关资料,学习流程还是和之前一样,看一遍协议文档,然后搭环境开发测试,完了再看协议文档,再开发测试,协议文档更多时候是用来发掘那些实习协议的第三方库的功能的。
主要参考文档资料如下:
- MQTT Version 3.1.1 OASIS Standard:官方协议文档
- MQTT Essentials:来自Hive MQ官网的文章,写得很详细,是本文内容摘取的主要对象
- Mosquitto Documentation:我们需要一个Broker来接收与分发消息
还有一份中文文档,刚刚找到的:MQTT协议中文版
如果想要了解MQTT的负载能力,这里有一份Benchmark of MQTT Servers文档可以参考,mosquitto的单线程的服务,双核4G,保证带宽的配置,QoS为0时,最大可以负载6万个Publisher
本次文章中,Broker使用mosquitto,使用了这一篇文章中构建的容器:Mosquitto TLS Configure,Client使用paho.mqtt.golang库,其他语言的客户端可以在Eclipse Paho项目中找到,这个项目提供了各种版本的MQTT客户端的开源方案。
然后是正文
传输协议
使用TCP或Websockets协议传输,分别使用tcp://host:port或ws://host:port形式的访问,明文传输;
可以通过配置证书,使用TLS加密数据,分别使用tls://host:port或wss://host:port;
Websocket相对原始协议做了一层封装,使用第三方库如Eclipse Paho JavaScript Client,让浏览器可以连接Broker,在线收取和下发消息,可以大大减少适配不同客户端的开发量。
配置mosquitto时,可以开发多个端口设置不同的协议来支持不同类型客户端的接入,建议配置TLS加密。
连接到Broker
在连接到数据库或redis时,都需要配置一些参数,这里以paho.mqtt.golang的ClientOptions为例
1func NewClientOptions() *ClientOptions {
2 o := &ClientOptions{
3 Servers: nil,
4 ClientID: "",
5 Username: "",
6 Password: "",
7 CleanSession: true,
8 Order: true,
9 WillEnabled: false,
10 WillTopic: "",
11 WillPayload: nil,
12 WillQos: 0,
13 WillRetained: false,
14 ProtocolVersion: 0,
15 protocolVersionExplicit: false,
16 TLSConfig: tls.Config{},
17 KeepAlive: 30 * time.Second,
18 PingTimeout: 10 * time.Second,
19 ConnectTimeout: 30 * time.Second,
20 MaxReconnectInterval: 10 * time.Minute,
21 AutoReconnect: true,
22 Store: nil,
23 OnConnect: nil,
24 OnConnectionLost: DefaultConnectionLostHandler,
25 WriteTimeout: 0, // 0 represents timeout disabled
26 MessageChannelDepth: 100,
27 }
28 return o
29}
默认配置中有许多选项,这里挑出几个:
Servers:填写格式为tcp://host:port、ws://host:port、 tcps://host:port、 tls://host:port、 wss://host:port等形式的链接
ClientID:通过规则自行生成ClientID可以用于监控客户端上线和离线,不设置的话,会得到一个自动生成的UUID
Username与Pssword:在服务端配置账号密码后使用,默认不启用
CleanSession:当QoS为1或2,设置为false时,启用持久化连接,Broker会为客户端存储所有订阅的Topic以及未接收的消息,当客户端断线重连后,会接收到这些内容,设置为true时,Broker会忽略这些消息
Will:五个选项分别设置是否启用遗嘱,遗嘱的Topic,Payload以及QoS,在客户端连接到Broker时,Broker会存储这些内容,在客户端异常退出时发布,最后的Retain表示否保留遗嘱信息,当有新的订阅者订阅遗嘱时,会立刻接收到消息
TLSConfig:使用TLS加密时,配置加载根证书
KeepAlive:心跳间隔,默认30秒发送一次维持长连接,期间如果有数据上下行则不发送,可以适当调高
onConnect:当客户端连接上Broker时执行的Block,可以在这里发布上线消息,每次建立起长连接后执行一次
心跳保活
1func keepalive(c *client) {
2 DEBUG.Println(PNG, "keepalive starting")
3
4 receiveInterval := c.options.KeepAlive + (1 * time.Second)
5 pingTimer := timer{Timer: time.NewTimer(c.options.KeepAlive)}
6 receiveTimer := timer{Timer: time.NewTimer(receiveInterval)}
7 pingRespTimer := timer{Timer: time.NewTimer(c.options.PingTimeout)}
8
9 pingRespTimer.Stop()
10
11 for {
12 select {
13 case <-c.stop:
14 DEBUG.Println(PNG, "keepalive stopped")
15 c.workers.Done()
16 return
17 case <-pingTimer.C:
18 sendPing(&pingTimer, &pingRespTimer, c)
19 case <-c.keepaliveReset:
20 DEBUG.Println(NET, "resetting ping timer")
21 pingTimer.Reset(c.options.KeepAlive)
22 case <-c.pingResp:
23 DEBUG.Println(NET, "resetting ping timeout timer")
24 pingRespTimer.Stop()
25 pingTimer.Reset(c.options.KeepAlive)
26 receiveTimer.Reset(receiveInterval)
27 case <-c.packetResp:
28 DEBUG.Println(NET, "resetting receive timer")
29 receiveTimer.Reset(receiveInterval)
30 case <-receiveTimer.C:
31 receiveTimer.SetRead(true)
32 receiveTimer.Reset(receiveInterval)
33 sendPing(&pingTimer, &pingRespTimer, c)
34 case <-pingRespTimer.C:
35 pingRespTimer.SetRead(true)
36 CRITICAL.Println(PNG, "pingresp not received, disconnecting")
37 c.workers.Done()
38 c.internalConnLost(errors.New("pingresp not received, disconnecting"))
39 pingTimer.Stop()
40 return
41 }
42 }
43}
可以看到一共设置了三个timer,pingTimer用于定时发送心跳包,receiveTimer作为一个补救措施,在发送PingReq或上行数据都没有接收到响应的情况下,再次发送一个PingReq,pingRespTimer触发时,表示已经与Broker断开连接,客户端清理本地连接,然后重新发起连接。
上下行数据的情况下,pingTimer与receiveTimer会被不断重置,不会发送PingReq消耗额外的流量;
上行数据后如果没有得到响应,触发接收超时,客户端会尝试发出一个PingReq;
每次发送PingReq时,都会重置pingRespTimer,接收到响应时,关闭pingRespTimer,重置其余两个Timer,未接收到响应时,触发pingRespTimer,表示与Broker连接断开,客户端会清理本地内容,然后重新连接Broker。
半开连接
在客户端看来,如果触发了pingRespTimer,那就已经与Broker断开了连接,需要重连,而在Broker看来,到指定的心跳时间到期前,都认为客户端仍旧保持着连接,期间只要接收到数据,就不会关闭连接,假如将心跳时间定的非常长,在网络不稳定的情况下,可能出现较大量的半开连接。
客户端断线重连成功的情况下,Broker会清理无效的旧连接,并建立一个新连接,这些在日志里都可以看到;
客户端断线重连失败,导致超时的情况下,Broker认为客户段已经掉线,会关闭连接并发布相应的遗嘱内容,可以通过监听遗嘱Topic获取掉线的客户端。
Retain选项
MQTT的消息的发布是广播式的,且只有当前订阅Topic的在线用户才能接收到消息,如果需要让订阅者在订阅Topic后立刻接收到消息,可以为Topic启用retain选项,Broker会为Topic保留一份最新的消息副本,每次发布Topic时如果带有retain的标志,则用它覆盖当前保存的副本。
QoS
MQTT的QoS分为三种0,1,2
QoS 0:至多一次,依赖TCP协议来确保信息到达
QoS 1:至少一次,发送方会保存消息,直到发送的消息被接收方确认回复后才清除
QoS 2:有且只有一次,发送和接收双方会使用四次握手,确保接收双方都得知消息已发送且被接收
QoS表示的是订阅者与Broker以及发布者与Broker间的服务质量,对于同一个Topic,发布者与接收者可以采用不同的QoS,越大的QoS意味更多的传输时间和更多的资源消耗。
MQTT Topic
每个Topic都是一个UTF-8的字符串,以斜杠分隔不同的层次,不能使用空格,例如:
China/Shenzen/NanShan/XiLi
我们可以订阅南山区西丽街道的消息,假如需要订阅南山区全部的消息
China/Shenzen/NanShan/#
订阅中国全部名为南山区的消息
China/+/NanShan
加号和井号分别表示单级匹配与多级匹配
另外可以订阅$SYS开头的Topic,获取Broker系统消息