写在最前面

最初在思考如何监控网关的连接状态,loraWAN的网关可以承载很多个节点,少则数十,多则上千,如果网关掉线了怎么办?节点就全挂了。

关于设备离线监控,原本想使用定时上报的策略来实现,但觉得既然MQTT定义了连接保活策略,那么是否可以通过监控设备连接来更新网关网关状态?可以的,只是实现的方式有点奇怪。

首先为每个网关设定一个clientID,直接按照loraWAN的标准,取网卡MAC地址,拓展到48位,避免系统使用UUID自动分配,然后使用ElasticSearch收集容器日志,监控Broker状态,每分钟轮询,当监控到匹配的字符串时,获取对应网关ID,更新连接状态,推送消息。

在更深入一点MQTT协议后,又发现其实可以利用MQTT的遗嘱机制,Client启用遗嘱,在连接到Broker时,携带指定的topic与payload,当Client异常断开时(没有使用Disconnect断开连接),Broker会发布遗嘱topic与payload,所以更好一点的机制是:上线时主动发布上线topic,掉线时利用遗嘱机制发布异常下线topic。

实际测试中,前者根据日志查询,实时性比后者高,任何导致重连的因素,如网络不稳定,重连后销毁旧连接,创建新连接等都会记录下来,产生较多的日志和推送,而后者只有在直接拔掉网关的网线或者掉电关机的情况,服务端检测到KeepAlive超时,发布一个异常下线的topic。如果需要考察网络连接的稳定性,可以直接监控Broker的日志,其他的情况使用遗嘱机制足以应对。

在了解MQTT的连接策略过程中看了很多相关资料,学习流程还是和之前一样,看一遍协议文档,然后搭环境开发测试,完了再看协议文档,再开发测试,协议文档更多时候是用来发掘那些实习协议的第三方库的功能的。

主要参考文档资料如下:

MQTT Version 3.1.1 OASIS Standard(官方协议文档)

MQTT Essentials(来自Hive MQ官网的文章,写得很详细,是本文内容摘取的主要对象)

Mosquitto Documentation(我们需要一个Broker来接收与分发消息)

还有一份中文文档,刚刚找到的

MQTT协议中文版

如果想要了解MQTT的负载能力,这里有一份Benchmark of MQTT Servers文档可以参考,mosquitto的单线程的服务,双核4G,保证带宽的配置,QoS为0时,最大可以负载6万个Publisher

本次文章中,Broker使用mosquitto,使用了这一篇文章中构建的容器:Mosquitto TLS Configure,Client使用paho.mqtt.golang库,其他语言的客户端可以在Eclipse Paho项目中找到,这个项目提供了各种版本的MQTT客户端的开源方案。

然后是正文

传输协议

使用TCP或Websockets协议传输,分别使用tcp://host:portws://host:port形式的访问,明文传输;

可以通过配置证书,使用TLS加密数据,分别使用tls://host:portwss://host:port

Websocket相对原始协议做了一层封装,使用第三方库如Eclipse Paho JavaScript Client,让浏览器可以连接Broker,在线收取和下发消息,可以大大减少适配不同客户端的开发量。

配置mosquitto时,可以开发多个端口设置不同的协议来支持不同类型客户端的接入,建议配置TLS加密。

连接到Broker

在连接到数据库或redis时,都需要配置一些参数,这里以paho.mqtt.golang的ClientOptions为例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func NewClientOptions() *ClientOptions {
    o := &ClientOptions{
        Servers:                 nil,
        ClientID:                "",
        Username:                "",
        Password:                "",
        CleanSession:            true,
        Order:                   true,
        WillEnabled:             false,
        WillTopic:               "",
        WillPayload:             nil,
        WillQos:                 0,
        WillRetained:            false,
        ProtocolVersion:         0,
        protocolVersionExplicit: false,
        TLSConfig:               tls.Config{},
        KeepAlive:               30 * time.Second,
        PingTimeout:             10 * time.Second,
        ConnectTimeout:          30 * time.Second,
        MaxReconnectInterval:    10 * time.Minute,
        AutoReconnect:           true,
        Store:                   nil,
        OnConnect:               nil,
        OnConnectionLost:        DefaultConnectionLostHandler,
        WriteTimeout:            0, // 0 represents timeout disabled
        MessageChannelDepth:     100,
    }
    return o
}

默认配置中有许多选项,这里挑出几个:

Servers:填写格式为tcp://host:portws://host:port、 tcps://host:port、 tls://host:port、 wss://host:port等形式的链接

ClientID:通过规则自行生成ClientID可以用于监控客户端上线和离线,不设置的话,会得到一个自动生成的UUID

Username与Pssword:在服务端配置账号密码后使用,默认不启用

CleanSession:当QoS为1或2,设置为false时,启用持久化连接,Broker会为客户端存储所有订阅的Topic以及未接收的消息,当客户端断线重连后,会接收到这些内容,设置为true时,Broker会忽略这些消息

Will:五个选项分别设置是否启用遗嘱,遗嘱的Topic,Payload以及QoS,在客户端连接到Broker时,Broker会存储这些内容,在客户端异常退出时发布,最后的Retain表示否保留遗嘱信息,当有新的订阅者订阅遗嘱时,会立刻接收到消息

TLSConfig:使用TLS加密时,配置加载根证书

KeepAlive:心跳间隔,默认30秒发送一次维持长连接,期间如果有数据上下行则不发送,可以适当调高

onConnect:当客户端连接上Broker时执行的Block,可以在这里发布上线消息,每次建立起长连接后执行一次

心跳保活

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func keepalive(c *client) {
    DEBUG.Println(PNG, "keepalive starting")

    receiveInterval := c.options.KeepAlive + (1 * time.Second)
    pingTimer := timer{Timer: time.NewTimer(c.options.KeepAlive)}
    receiveTimer := timer{Timer: time.NewTimer(receiveInterval)}
    pingRespTimer := timer{Timer: time.NewTimer(c.options.PingTimeout)}

    pingRespTimer.Stop()

    for {
        select {
        case <-c.stop:
            DEBUG.Println(PNG, "keepalive stopped")
            c.workers.Done()
            return
        case <-pingTimer.C:
            sendPing(&pingTimer, &pingRespTimer, c)
        case <-c.keepaliveReset:
            DEBUG.Println(NET, "resetting ping timer")
            pingTimer.Reset(c.options.KeepAlive)
        case <-c.pingResp:
            DEBUG.Println(NET, "resetting ping timeout timer")
            pingRespTimer.Stop()
            pingTimer.Reset(c.options.KeepAlive)
            receiveTimer.Reset(receiveInterval)
        case <-c.packetResp:
            DEBUG.Println(NET, "resetting receive timer")
            receiveTimer.Reset(receiveInterval)
        case <-receiveTimer.C:
            receiveTimer.SetRead(true)
            receiveTimer.Reset(receiveInterval)
            sendPing(&pingTimer, &pingRespTimer, c)
        case <-pingRespTimer.C:
            pingRespTimer.SetRead(true)
            CRITICAL.Println(PNG, "pingresp not received, disconnecting")
            c.workers.Done()
            c.internalConnLost(errors.New("pingresp not received, disconnecting"))
            pingTimer.Stop()
            return
        }
    }
}

可以看到一共设置了三个timer,pingTimer用于定时发送心跳包,receiveTimer作为一个补救措施,在发送PingReq或上行数据都没有接收到响应的情况下,再次发送一个PingReq,pingRespTimer触发时,表示已经与Broker断开连接,客户端清理本地连接,然后重新发起连接。

上下行数据的情况下,pingTimer与receiveTimer会被不断重置,不会发送PingReq消耗额外的流量;

上行数据后如果没有得到响应,触发接收超时,客户端会尝试发出一个PingReq;

每次发送PingReq时,都会重置pingRespTimer,接收到响应时,关闭pingRespTimer,重置其余两个Timer,未接收到响应时,触发pingRespTimer,表示与Broker连接断开,客户端会清理本地内容,然后重新连接Broker。

半开连接

在客户端看来,如果触发了pingRespTimer,那就已经与Broker断开了连接,需要重连,而在Broker看来,到指定的心跳时间到期前,都认为客户端仍旧保持着连接,期间只要接收到数据,就不会关闭连接,假如将心跳时间定的非常长,在网络不稳定的情况下,可能出现较大量的半开连接。

客户端断线重连成功的情况下,Broker会清理无效的旧连接,并建立一个新连接,这些在日志里都可以看到;

客户端断线重连失败,导致超时的情况下,Broker认为客户段已经掉线,会关闭连接并发布相应的遗嘱内容,可以通过监听遗嘱Topic获取掉线的客户端。

Retain选项

MQTT的消息的发布是广播式的,且只有当前订阅Topic的在线用户才能接收到消息,如果需要让订阅者在订阅Topic后立刻接收到消息,可以为Topic启用retain选项,Broker会为Topic保留一份最新的消息副本,每次发布Topic时如果带有retain的标志,则用它覆盖当前保存的副本。

QoS

MQTT的QoS分为三种0,1,2

QoS 0:至多一次,依赖TCP协议来确保信息到达

QoS 1:至少一次,发送方会保存消息,直到发送的消息被接收方确认回复后才清除

QoS 2:有且只有一次,发送和接收双方会使用四次握手,确保接收双方都得知消息已发送且被接收

QoS表示的是订阅者与Broker以及发布者与Broker间的服务质量,对于同一个Topic,发布者与接收者可以采用不同的QoS,越大的QoS意味更多的传输时间和更多的资源消耗。

MQTT Topic

每个Topic都是一个UTF-8的字符串,以斜杠分隔不同的层次,不能使用空格,例如:

China/Shenzen/NanShan/XiLi

我们可以订阅南山区西丽街道的消息,假如需要订阅南山区全部的消息

China/Shenzen/NanShan/#

订阅中国全部名为南山区的消息

China/+/NanShan

加号和井号分别表示单级匹配与多级匹配

另外可以订阅$SYS开头的Topic,获取Broker系统消息