Golang笔记–02–套接字
文章目录
前言
在我们的生产环境中,在用的不同厂家设备接近10种,而对接过的就不计其数了。每次对接都需要做的事情都需要做协议解析、数据存储、逻辑判断,最后接入后台系统,Golang就很适合用来做这些处理。对接的协议从HTTP、MQTT到TCP、UDP都有方便的实现,数据存储可以根据情况直接使用内存、或者Redis、MySQL等,逻辑判断则需要筛选数据,并做好监控。最后的部署,可以直接打包成一个可执行程序,通过环境变量配置监听的端口、Redis或数据库地址、处理结果的推送地址,直接部署或者使用Docker。对接时会比较头疼的还是TCP与UDP,前者需要做长连接的管理,后者需要做数据捎带下发,其次就是需要摸清设备的坑,特别是一些设备对自定义协议实现不完善的情况。
下面回到这次的正文,套接字服务器。
TCP Socket Server
谈到TCP就需要谈长连接,包括长连接的建立、替换、关闭和保活,确保服务端能够完成有效的主动数据下发。
1var conns = make(map[string]net.Conn, 1024)
2
3func serveSocket() {
4 var tcpAddr *net.TCPAddr
5 tcpAddr, _ = net.ResolveTCPAddr(connectionType, connectionHost+":"+connectionPort)
6 l, err := net.ListenTCP(connectionType, tcpAddr)
7 if err != nil {
8 log.Fatal("Error listening: ", err.Error())
9 }
10 defer l.Close()
11 log.Info("Listening on " + connectionType + " " + connectionHost + ":" + connectionPort)
12 for {
13 conn, err := l.AcceptTCP()
14 if err != nil {
15 log.Info("Error Accepting: ", err.Error())
16 continue
17 }
18 go handleRequest(conn)
19 }
20}
上面是一个基础的监听和接受新连接的例子,用一个map来存储已有的长连接,下面处理连接。
1func handleRequest(conn net.Conn) {
2 ipAddr := conn.RemoteAddr().String()
3 msg := "Accepting New Connection: " + ipAddr
4 connID := ""
5 // 设置超时时间
6 timeoutDuration := 30 * time.Minute
7 conn.SetDeadline(time.Now().Add(timeoutDuration))
8 bufReader := bufio.NewReader(conn)
9 for {
10 bytes := make([]byte, 256)
11 reqLen, err := bufReader.Read(bytes)
12 if err != nil {
13 log.Info("buffer Read Error: ", err.Error())
14 break
15 }
16 if reqLen == 0 {
17 continue
18 }
19 data := bytes[0 : reqLen-1]
20 //以16进制打印原始数据
21 var byteStr = ""
22 for _, v := range data {
23 byteStr += fmt.Sprintf("0x%02x ", v)
24 }
25
26 // 按照协议检查数据格式
27 if !checkData(data) {
28 break
29 }
30 // 对于符合调节的连接,延长寿命
31 conn.SetDeadline(time.Now().Add(timeoutDuration))
32 // 实际处理数据的函数,获取响应和长连接ID
33 resp, cID, err := handleRequestData(data)
34 v, ok := conns[cID]
35 connID = cID
36 // 检查连接是否已经保存
37 if ok {
38 // 替换旧连接
39 if v.RemoteAddr().String() != conn.RemoteAddr().String() {
40 v.SetDeadline(time.Now().Add(1 * time.Second))
41 conns[cID] = conn
42 }
43 } else {
44 // 新增连接
45 conns[cID] = conn
46 }
47 if err != nil {
48 log.Info(err.Error())
49 } else {
50 // 返回响应
51 if len(resp) > 0 {
52 conn.Write(resp)
53 }
54 }
55 }
56 // 连接断开的处理,包括主动break,连接超时
57 msgDis := "Disconnecting Connection: " + ipAddr
58 // 在客户端主动创建新连接时,map中保存的是新的长连接
59 if v, ok := conns[connID]; ok {
60 if v.RemoteAddr().String() == ipAddr {
61 delete(conns, connID)
62 }
63 }
64 conn.Close()
65}
当然上面也可以用coonID来判断是否已经存储了长连接,需要注意弱网络环境下,客户端可能不断创建新连接,这些连接的处理分散在不同的goroutine中。
UDP Socket Server
由于UDP是无连接的,只能每次拿到数据时解析后写回,和LoraWAN Class A设备有点像,NB-IoT的物联网节点也常用UDP来与服务器通信。
1// StartUDPServer serve a udp socket
2func StartUDPServer() {
3 var udpAddr *net.UDPAddr
4 udpAddr, _ = net.ResolveUDPAddr(listenType, listenURL)
5 conn, err := net.ListenUDP(listenType, udpAddr)
6 if err != nil {
7 log.Fatal("Error listening: ", err.Error())
8 }
9 defer conn.Close()
10 log.Info("Listening on " + listenType + " " + listenURL)
11 for {
12 buf := make([]byte, 256)
13 n, addr, err := conn.ReadFromUDP(buf)
14 if err != nil {
15 log.Info("Error: ", err)
16 continue
17 }
18 go hanlde(conn, addr, buf[0:n])
19 }
20}
下面是处理请求的函数
1func hanlde(conn *net.UDPConn, addr *net.UDPAddr, data []byte) {
2 // 这里模仿LoraWAN服务端的方式,200ms后如果校验正确则回复
3 // 用channel做同步,200ms后触发,如果是耗时的操作,需要做异步处理
4 done := make(chan bool)
5 go func() {
6 time.Sleep(200 * time.Millisecond)
7 done <- true
8 }()
9 // 以16进制格式打印数据
10 var byteStr = ""
11 for _, v := range data {
12 byteStr += fmt.Sprintf("0x%02x ", v)
13 }
14 log.Info("Received ", byteStr, " from ", addr)
15 var resp []byte
16 // 从原始数据获取对象
17 p := model.NewPacket(data[:])
18 if p != nil {
19 jsonStr := p.JSONStr()
20 resp = p.BytesResp()
21 log.Info("JSONStr:",jsonStr)
22 log.Info("Resp:",resp)
23 }
24 //触发
25 <-done
26 if len(resp) > 0 {
27 _, err := conn.WriteToUDP(resp, addr)
28 if err != nil {
29 log.Info("failed to write data: ", err)
30 }
31 }
32 //关闭channel
33 close(done)
34}
使用Redis和MySQL的部分就不贴出,根据具体的协议处理