前言
在我们的生产环境中,在用的不同厂家设备接近10种,而对接过的就不计其数了。每次对接都需要做的事情都需要做协议解析、数据存储、逻辑判断,最后接入后台系统,Golang就很适合用来做这些处理。对接的协议从HTTP、MQTT到TCP、UDP都有方便的实现,数据存储可以根据情况直接使用内存、或者Redis、MySQL等,逻辑判断则需要筛选数据,并做好监控。最后的部署,可以直接打包成一个可执行程序,通过环境变量配置监听的端口、Redis或数据库地址、处理结果的推送地址,直接部署或者使用Docker。对接时会比较头疼的还是TCP与UDP,前者需要做长连接的管理,后者需要做数据捎带下发,其次就是需要摸清设备的坑,特别是一些设备对自定义协议实现不完善的情况。
下面回到这次的正文,套接字服务器。
TCP Socket Server
谈到TCP就需要谈长连接,包括长连接的建立、替换、关闭和保活,确保服务端能够完成有效的主动数据下发。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
var conns = make(map[string]net.Conn, 1024)
func serveSocket() {
var tcpAddr *net.TCPAddr
tcpAddr, _ = net.ResolveTCPAddr(connectionType, connectionHost+":"+connectionPort)
l, err := net.ListenTCP(connectionType, tcpAddr)
if err != nil {
log.Fatal("Error listening: ", err.Error())
}
defer l.Close()
log.Info("Listening on " + connectionType + " " + connectionHost + ":" + connectionPort)
for {
conn, err := l.AcceptTCP()
if err != nil {
log.Info("Error Accepting: ", err.Error())
continue
}
go handleRequest(conn)
}
}
|
上面是一个基础的监听和接受新连接的例子,用一个map来存储已有的长连接,下面处理连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
func handleRequest(conn net.Conn) {
ipAddr := conn.RemoteAddr().String()
msg := "Accepting New Connection: " + ipAddr
connID := ""
// 设置超时时间
timeoutDuration := 30 * time.Minute
conn.SetDeadline(time.Now().Add(timeoutDuration))
bufReader := bufio.NewReader(conn)
for {
bytes := make([]byte, 256)
reqLen, err := bufReader.Read(bytes)
if err != nil {
log.Info("buffer Read Error: ", err.Error())
break
}
if reqLen == 0 {
continue
}
data := bytes[0 : reqLen-1]
//以16进制打印原始数据
var byteStr = ""
for _, v := range data {
byteStr += fmt.Sprintf("0x%02x ", v)
}
// 按照协议检查数据格式
if !checkData(data) {
break
}
// 对于符合调节的连接,延长寿命
conn.SetDeadline(time.Now().Add(timeoutDuration))
// 实际处理数据的函数,获取响应和长连接ID
resp, cID, err := handleRequestData(data)
v, ok := conns[cID]
connID = cID
// 检查连接是否已经保存
if ok {
// 替换旧连接
if v.RemoteAddr().String() != conn.RemoteAddr().String() {
v.SetDeadline(time.Now().Add(1 * time.Second))
conns[cID] = conn
}
} else {
// 新增连接
conns[cID] = conn
}
if err != nil {
log.Info(err.Error())
} else {
// 返回响应
if len(resp) > 0 {
conn.Write(resp)
}
}
}
// 连接断开的处理,包括主动break,连接超时
msgDis := "Disconnecting Connection: " + ipAddr
// 在客户端主动创建新连接时,map中保存的是新的长连接
if v, ok := conns[connID]; ok {
if v.RemoteAddr().String() == ipAddr {
delete(conns, connID)
}
}
conn.Close()
}
|
当然上面也可以用coonID来判断是否已经存储了长连接,需要注意弱网络环境下,客户端可能不断创建新连接,这些连接的处理分散在不同的goroutine中。
UDP Socket Server
由于UDP是无连接的,只能每次拿到数据时解析后写回,和LoraWAN Class A设备有点像,NB-IoT的物联网节点也常用UDP来与服务器通信。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
// StartUDPServer serve a udp socket
func StartUDPServer() {
var udpAddr *net.UDPAddr
udpAddr, _ = net.ResolveUDPAddr(listenType, listenURL)
conn, err := net.ListenUDP(listenType, udpAddr)
if err != nil {
log.Fatal("Error listening: ", err.Error())
}
defer conn.Close()
log.Info("Listening on " + listenType + " " + listenURL)
for {
buf := make([]byte, 256)
n, addr, err := conn.ReadFromUDP(buf)
if err != nil {
log.Info("Error: ", err)
continue
}
go hanlde(conn, addr, buf[0:n])
}
}
|
下面是处理请求的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func hanlde(conn *net.UDPConn, addr *net.UDPAddr, data []byte) {
// 这里模仿LoraWAN服务端的方式,200ms后如果校验正确则回复
// 用channel做同步,200ms后触发,如果是耗时的操作,需要做异步处理
done := make(chan bool)
go func() {
time.Sleep(200 * time.Millisecond)
done <- true
}()
// 以16进制格式打印数据
var byteStr = ""
for _, v := range data {
byteStr += fmt.Sprintf("0x%02x ", v)
}
log.Info("Received ", byteStr, " from ", addr)
var resp []byte
// 从原始数据获取对象
p := model.NewPacket(data[:])
if p != nil {
jsonStr := p.JSONStr()
resp = p.BytesResp()
log.Info("JSONStr:",jsonStr)
log.Info("Resp:",resp)
}
//触发
<-done
if len(resp) > 0 {
_, err := conn.WriteToUDP(resp, addr)
if err != nil {
log.Info("failed to write data: ", err)
}
}
//关闭channel
close(done)
}
|
使用Redis和MySQL的部分就不贴出,根据具体的协议处理