MQTT笔记

Overview

写在最前面

最初在思考如何监控网关的连接状态,loraWAN的网关可以承载很多个节点,少则数十,多则上千,如果网关掉线了怎么办?节点就全挂了。

关于设备离线监控,原本想使用定时上报的策略来实现,但觉得既然MQTT定义了连接保活策略,那么是否可以通过监控设备连接来更新网关网关状态?可以的,只是实现的方式有点奇怪。

首先为每个网关设定一个clientID,直接按照loraWAN的标准,取网卡MAC地址,拓展到48位,避免系统使用UUID自动分配,然后使用ElasticSearch收集容器日志,监控Broker状态,每分钟轮询,当监控到匹配的字符串时,获取对应网关ID,更新连接状态,推送消息。

在更深入一点MQTT协议后,又发现其实可以利用MQTT的遗嘱机制,Client启用遗嘱,在连接到Broker时,携带指定的topic与payload,当Client异常断开时(没有使用Disconnect断开连接),Broker会发布遗嘱topic与payload,所以更好一点的机制是:上线时主动发布上线topic,掉线时利用遗嘱机制发布异常下线topic。

实际测试中,前者根据日志查询,实时性比后者高,任何导致重连的因素,如网络不稳定,重连后销毁旧连接,创建新连接等都会记录下来,产生较多的日志和推送,而后者只有在直接拔掉网关的网线或者掉电关机的情况,服务端检测到KeepAlive超时,发布一个异常下线的topic。如果需要考察网络连接的稳定性,可以直接监控Broker的日志,其他的情况使用遗嘱机制足以应对。

在了解MQTT的连接策略过程中看了很多相关资料,学习流程还是和之前一样,看一遍协议文档,然后搭环境开发测试,完了再看协议文档,再开发测试,协议文档更多时候是用来发掘那些实习协议的第三方库的功能的。

主要参考文档资料如下:

还有一份中文文档,刚刚找到的:MQTT协议中文版

如果想要了解MQTT的负载能力,这里有一份Benchmark of MQTT Servers文档可以参考,mosquitto的单线程的服务,双核4G,保证带宽的配置,QoS为0时,最大可以负载6万个Publisher

本次文章中,Broker使用mosquitto,使用了这一篇文章中构建的容器:Mosquitto TLS Configure,Client使用paho.mqtt.golang库,其他语言的客户端可以在Eclipse Paho项目中找到,这个项目提供了各种版本的MQTT客户端的开源方案。

然后是正文

传输协议

使用TCP或Websockets协议传输,分别使用tcp://host:portws://host:port形式的访问,明文传输;

可以通过配置证书,使用TLS加密数据,分别使用tls://host:portwss://host:port

Websocket相对原始协议做了一层封装,使用第三方库如Eclipse Paho JavaScript Client,让浏览器可以连接Broker,在线收取和下发消息,可以大大减少适配不同客户端的开发量。

配置mosquitto时,可以开发多个端口设置不同的协议来支持不同类型客户端的接入,建议配置TLS加密。

连接到Broker

在连接到数据库或redis时,都需要配置一些参数,这里以paho.mqtt.golang的ClientOptions为例

 1func NewClientOptions() *ClientOptions {
 2	o := &ClientOptions{
 3		Servers:                 nil,
 4		ClientID:                "",
 5		Username:                "",
 6		Password:                "",
 7		CleanSession:            true,
 8		Order:                   true,
 9		WillEnabled:             false,
10		WillTopic:               "",
11		WillPayload:             nil,
12		WillQos:                 0,
13		WillRetained:            false,
14		ProtocolVersion:         0,
15		protocolVersionExplicit: false,
16		TLSConfig:               tls.Config{},
17		KeepAlive:               30 * time.Second,
18		PingTimeout:             10 * time.Second,
19		ConnectTimeout:          30 * time.Second,
20		MaxReconnectInterval:    10 * time.Minute,
21		AutoReconnect:           true,
22		Store:                   nil,
23		OnConnect:               nil,
24		OnConnectionLost:        DefaultConnectionLostHandler,
25		WriteTimeout:            0, // 0 represents timeout disabled
26		MessageChannelDepth:     100,
27	}
28	return o
29}

默认配置中有许多选项,这里挑出几个:

Servers:填写格式为tcp://host:portws://host:port、 tcps://host:port、 tls://host:port、 wss://host:port等形式的链接

ClientID:通过规则自行生成ClientID可以用于监控客户端上线和离线,不设置的话,会得到一个自动生成的UUID

Username与Pssword:在服务端配置账号密码后使用,默认不启用

CleanSession:当QoS为1或2,设置为false时,启用持久化连接,Broker会为客户端存储所有订阅的Topic以及未接收的消息,当客户端断线重连后,会接收到这些内容,设置为true时,Broker会忽略这些消息

Will:五个选项分别设置是否启用遗嘱,遗嘱的Topic,Payload以及QoS,在客户端连接到Broker时,Broker会存储这些内容,在客户端异常退出时发布,最后的Retain表示否保留遗嘱信息,当有新的订阅者订阅遗嘱时,会立刻接收到消息

TLSConfig:使用TLS加密时,配置加载根证书

KeepAlive:心跳间隔,默认30秒发送一次维持长连接,期间如果有数据上下行则不发送,可以适当调高

onConnect:当客户端连接上Broker时执行的Block,可以在这里发布上线消息,每次建立起长连接后执行一次

心跳保活

 1func keepalive(c *client) {
 2	DEBUG.Println(PNG, "keepalive starting")
 3
 4	receiveInterval := c.options.KeepAlive + (1 * time.Second)
 5	pingTimer := timer{Timer: time.NewTimer(c.options.KeepAlive)}
 6	receiveTimer := timer{Timer: time.NewTimer(receiveInterval)}
 7	pingRespTimer := timer{Timer: time.NewTimer(c.options.PingTimeout)}
 8
 9	pingRespTimer.Stop()
10
11	for {
12		select {
13		case <-c.stop:
14			DEBUG.Println(PNG, "keepalive stopped")
15			c.workers.Done()
16			return
17		case <-pingTimer.C:
18			sendPing(&pingTimer, &pingRespTimer, c)
19		case <-c.keepaliveReset:
20			DEBUG.Println(NET, "resetting ping timer")
21			pingTimer.Reset(c.options.KeepAlive)
22		case <-c.pingResp:
23			DEBUG.Println(NET, "resetting ping timeout timer")
24			pingRespTimer.Stop()
25			pingTimer.Reset(c.options.KeepAlive)
26			receiveTimer.Reset(receiveInterval)
27		case <-c.packetResp:
28			DEBUG.Println(NET, "resetting receive timer")
29			receiveTimer.Reset(receiveInterval)
30		case <-receiveTimer.C:
31			receiveTimer.SetRead(true)
32			receiveTimer.Reset(receiveInterval)
33			sendPing(&pingTimer, &pingRespTimer, c)
34		case <-pingRespTimer.C:
35			pingRespTimer.SetRead(true)
36			CRITICAL.Println(PNG, "pingresp not received, disconnecting")
37			c.workers.Done()
38			c.internalConnLost(errors.New("pingresp not received, disconnecting"))
39			pingTimer.Stop()
40			return
41		}
42	}
43}

可以看到一共设置了三个timer,pingTimer用于定时发送心跳包,receiveTimer作为一个补救措施,在发送PingReq或上行数据都没有接收到响应的情况下,再次发送一个PingReq,pingRespTimer触发时,表示已经与Broker断开连接,客户端清理本地连接,然后重新发起连接。

上下行数据的情况下,pingTimer与receiveTimer会被不断重置,不会发送PingReq消耗额外的流量;

上行数据后如果没有得到响应,触发接收超时,客户端会尝试发出一个PingReq;

每次发送PingReq时,都会重置pingRespTimer,接收到响应时,关闭pingRespTimer,重置其余两个Timer,未接收到响应时,触发pingRespTimer,表示与Broker连接断开,客户端会清理本地内容,然后重新连接Broker。

半开连接

在客户端看来,如果触发了pingRespTimer,那就已经与Broker断开了连接,需要重连,而在Broker看来,到指定的心跳时间到期前,都认为客户端仍旧保持着连接,期间只要接收到数据,就不会关闭连接,假如将心跳时间定的非常长,在网络不稳定的情况下,可能出现较大量的半开连接。

客户端断线重连成功的情况下,Broker会清理无效的旧连接,并建立一个新连接,这些在日志里都可以看到;

客户端断线重连失败,导致超时的情况下,Broker认为客户段已经掉线,会关闭连接并发布相应的遗嘱内容,可以通过监听遗嘱Topic获取掉线的客户端。

Retain选项

MQTT的消息的发布是广播式的,且只有当前订阅Topic的在线用户才能接收到消息,如果需要让订阅者在订阅Topic后立刻接收到消息,可以为Topic启用retain选项,Broker会为Topic保留一份最新的消息副本,每次发布Topic时如果带有retain的标志,则用它覆盖当前保存的副本。

QoS

MQTT的QoS分为三种0,1,2

QoS 0:至多一次,依赖TCP协议来确保信息到达

QoS 1:至少一次,发送方会保存消息,直到发送的消息被接收方确认回复后才清除

QoS 2:有且只有一次,发送和接收双方会使用四次握手,确保接收双方都得知消息已发送且被接收

QoS表示的是订阅者与Broker以及发布者与Broker间的服务质量,对于同一个Topic,发布者与接收者可以采用不同的QoS,越大的QoS意味更多的传输时间和更多的资源消耗。

MQTT Topic

每个Topic都是一个UTF-8的字符串,以斜杠分隔不同的层次,不能使用空格,例如:

China/Shenzen/NanShan/XiLi

我们可以订阅南山区西丽街道的消息,假如需要订阅南山区全部的消息

China/Shenzen/NanShan/#

订阅中国全部名为南山区的消息

China/+/NanShan

加号和井号分别表示单级匹配与多级匹配

另外可以订阅$SYS开头的Topic,获取Broker系统消息