前言

在我们的生产环境中,在用的不同厂家设备接近10种,而对接过的就不计其数了。每次对接都需要做的事情都需要做协议解析、数据存储、逻辑判断,最后接入后台系统,Golang就很适合用来做这些处理。对接的协议从HTTP、MQTT到TCP、UDP都有方便的实现,数据存储可以根据情况直接使用内存、或者Redis、MySQL等,逻辑判断则需要筛选数据,并做好监控。最后的部署,可以直接打包成一个可执行程序,通过环境变量配置监听的端口、Redis或数据库地址、处理结果的推送地址,直接部署或者使用Docker。对接时会比较头疼的还是TCP与UDP,前者需要做长连接的管理,后者需要做数据捎带下发,其次就是需要摸清设备的坑,特别是一些设备对自定义协议实现不完善的情况。

下面回到这次的正文,套接字服务器

TCP Socket Server

谈到TCP就需要谈长连接,包括长连接的建立、替换、关闭和保活,确保服务端能够完成有效的主动数据下发。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
var conns = make(map[string]net.Conn, 1024)

func serveSocket() {
    var tcpAddr *net.TCPAddr
    tcpAddr, _ = net.ResolveTCPAddr(connectionType, connectionHost+":"+connectionPort)
    l, err := net.ListenTCP(connectionType, tcpAddr)
    if err != nil {
        log.Fatal("Error listening: ", err.Error())
    }
    defer l.Close()
    log.Info("Listening on " + connectionType + " " + connectionHost + ":" + connectionPort)
    for {
        conn, err := l.AcceptTCP()
        if err != nil {
            log.Info("Error Accepting: ", err.Error())
            continue
        }
        go handleRequest(conn)
    }
}

上面是一个基础的监听和接受新连接的例子,用一个map来存储已有的长连接,下面处理连接。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func handleRequest(conn net.Conn) {
    ipAddr := conn.RemoteAddr().String()
    msg := "Accepting New Connection: " + ipAddr
    connID := ""
    // 设置超时时间
    timeoutDuration := 30 * time.Minute
    conn.SetDeadline(time.Now().Add(timeoutDuration))
    bufReader := bufio.NewReader(conn)
    for {
        bytes := make([]byte, 256)
        reqLen, err := bufReader.Read(bytes)
        if err != nil {
            log.Info("buffer Read Error: ", err.Error())
            break
        }
        if reqLen == 0 {
            continue
        }
        data := bytes[0 : reqLen-1]
        //以16进制打印原始数据
        var byteStr = ""
        for _, v := range data {
            byteStr += fmt.Sprintf("0x%02x ", v)
        }

        // 按照协议检查数据格式
        if !checkData(data) { 
                    break 
                } 
                // 对于符合调节的连接,延长寿命 
                conn.SetDeadline(time.Now().Add(timeoutDuration)) 
                // 实际处理数据的函数,获取响应和长连接ID 
                resp, cID, err := handleRequestData(data) 
                v, ok := conns[cID] 
                connID = cID 
                // 检查连接是否已经保存 
                if ok { 
                // 替换旧连接 
                     if v.RemoteAddr().String() != conn.RemoteAddr().String() {
                         v.SetDeadline(time.Now().Add(1 * time.Second)) 
                         conns[cID] = conn 
                     } 
                 } else { 
                       // 新增连接 
                       conns[cID] = conn 
                 } 
                if err != nil { 
                    log.Info(err.Error()) 
                } else { 
                // 返回响应 
                    if len(resp) > 0 {
                         conn.Write(resp)
            }
            }
    }
    // 连接断开的处理,包括主动break,连接超时
    msgDis := "Disconnecting Connection: " + ipAddr
    // 在客户端主动创建新连接时,map中保存的是新的长连接
    if v, ok := conns[connID]; ok {
        if v.RemoteAddr().String() == ipAddr {
            delete(conns, connID)
        }
    }
    conn.Close()
}

当然上面也可以用coonID来判断是否已经存储了长连接,需要注意弱网络环境下,客户端可能不断创建新连接,这些连接的处理分散在不同的goroutine中。

UDP Socket Server

由于UDP是无连接的,只能每次拿到数据时解析后写回,和LoraWAN Class A设备有点像,NB-IoT的物联网节点也常用UDP来与服务器通信。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// StartUDPServer serve a udp socket
func StartUDPServer() {
    var udpAddr *net.UDPAddr
    udpAddr, _ = net.ResolveUDPAddr(listenType, listenURL)
    conn, err := net.ListenUDP(listenType, udpAddr)
    if err != nil {
        log.Fatal("Error listening: ", err.Error())
    }
    defer conn.Close()
    log.Info("Listening on " + listenType + " " + listenURL)
    for {
        buf := make([]byte, 256)
        n, addr, err := conn.ReadFromUDP(buf)
        if err != nil {
            log.Info("Error: ", err)
            continue
        }
        go hanlde(conn, addr, buf[0:n])
    }
}

下面是处理请求的函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func hanlde(conn *net.UDPConn, addr *net.UDPAddr, data []byte) {
    // 这里模仿LoraWAN服务端的方式,200ms后如果校验正确则回复
    // 用channel做同步,200ms后触发,如果是耗时的操作,需要做异步处理
    done := make(chan bool)
    go func() {
        time.Sleep(200 * time.Millisecond)
        done <- true
    }()
    // 以16进制格式打印数据
    var byteStr = ""
    for _, v := range data {
        byteStr += fmt.Sprintf("0x%02x ", v)
    }
    log.Info("Received ", byteStr, " from ", addr)
    var resp []byte
    // 从原始数据获取对象
    p := model.NewPacket(data[:])
    if p != nil {
        jsonStr := p.JSONStr()
        resp = p.BytesResp()
        log.Info("JSONStr:",jsonStr)
        log.Info("Resp:",resp)
    }
    //触发
    <-done 
        if len(resp) > 0 {
        _, err := conn.WriteToUDP(resp, addr)
        if err != nil {
            log.Info("failed to write data: ", err)
        }
    }
    //关闭channel
    close(done)
}

使用Redis和MySQL的部分就不贴出,根据具体的协议处理