Golang笔记–02–套接字

Overview

前言

在我们的生产环境中,在用的不同厂家设备接近10种,而对接过的就不计其数了。每次对接都需要做的事情都需要做协议解析、数据存储、逻辑判断,最后接入后台系统,Golang就很适合用来做这些处理。对接的协议从HTTP、MQTT到TCP、UDP都有方便的实现,数据存储可以根据情况直接使用内存、或者Redis、MySQL等,逻辑判断则需要筛选数据,并做好监控。最后的部署,可以直接打包成一个可执行程序,通过环境变量配置监听的端口、Redis或数据库地址、处理结果的推送地址,直接部署或者使用Docker。对接时会比较头疼的还是TCP与UDP,前者需要做长连接的管理,后者需要做数据捎带下发,其次就是需要摸清设备的坑,特别是一些设备对自定义协议实现不完善的情况。

下面回到这次的正文,套接字服务器

TCP Socket Server

谈到TCP就需要谈长连接,包括长连接的建立、替换、关闭和保活,确保服务端能够完成有效的主动数据下发。

 1var conns = make(map[string]net.Conn, 1024)
 2
 3func serveSocket() {
 4	var tcpAddr *net.TCPAddr
 5	tcpAddr, _ = net.ResolveTCPAddr(connectionType, connectionHost+":"+connectionPort)
 6	l, err := net.ListenTCP(connectionType, tcpAddr)
 7	if err != nil {
 8		log.Fatal("Error listening: ", err.Error())
 9	}
10	defer l.Close()
11	log.Info("Listening on " + connectionType + " " + connectionHost + ":" + connectionPort)
12	for {
13		conn, err := l.AcceptTCP()
14		if err != nil {
15			log.Info("Error Accepting: ", err.Error())
16			continue
17		}
18		go handleRequest(conn)
19	}
20}

上面是一个基础的监听和接受新连接的例子,用一个map来存储已有的长连接,下面处理连接。

 1func handleRequest(conn net.Conn) {
 2	ipAddr := conn.RemoteAddr().String()
 3	msg := "Accepting New Connection: " + ipAddr
 4	connID := ""
 5	// 设置超时时间
 6	timeoutDuration := 30 * time.Minute
 7	conn.SetDeadline(time.Now().Add(timeoutDuration))
 8	bufReader := bufio.NewReader(conn)
 9	for {
10		bytes := make([]byte, 256)
11		reqLen, err := bufReader.Read(bytes)
12		if err != nil {
13			log.Info("buffer Read Error: ", err.Error())
14			break
15		}
16		if reqLen == 0 {
17			continue
18		}
19		data := bytes[0 : reqLen-1]
20		//以16进制打印原始数据
21		var byteStr = ""
22		for _, v := range data {
23			byteStr += fmt.Sprintf("0x%02x ", v)
24		}
25
26		// 按照协议检查数据格式
27		if !checkData(data) { 
28                    break 
29                } 
30                // 对于符合调节的连接,延长寿命 
31                conn.SetDeadline(time.Now().Add(timeoutDuration)) 
32                // 实际处理数据的函数,获取响应和长连接ID 
33                resp, cID, err := handleRequestData(data) 
34                v, ok := conns[cID] 
35                connID = cID 
36                // 检查连接是否已经保存 
37                if ok { 
38                // 替换旧连接 
39                     if v.RemoteAddr().String() != conn.RemoteAddr().String() {
40                         v.SetDeadline(time.Now().Add(1 * time.Second)) 
41                         conns[cID] = conn 
42                     } 
43                 } else { 
44                       // 新增连接 
45                       conns[cID] = conn 
46                 } 
47                if err != nil { 
48                    log.Info(err.Error()) 
49                } else { 
50                // 返回响应 
51                    if len(resp) > 0 {
52                         conn.Write(resp)
53		    }
54	        }
55	}
56	// 连接断开的处理,包括主动break,连接超时
57	msgDis := "Disconnecting Connection: " + ipAddr
58	// 在客户端主动创建新连接时,map中保存的是新的长连接
59	if v, ok := conns[connID]; ok {
60		if v.RemoteAddr().String() == ipAddr {
61			delete(conns, connID)
62		}
63	}
64	conn.Close()
65}

当然上面也可以用coonID来判断是否已经存储了长连接,需要注意弱网络环境下,客户端可能不断创建新连接,这些连接的处理分散在不同的goroutine中。

UDP Socket Server

由于UDP是无连接的,只能每次拿到数据时解析后写回,和LoraWAN Class A设备有点像,NB-IoT的物联网节点也常用UDP来与服务器通信。

 1// StartUDPServer serve a udp socket
 2func StartUDPServer() {
 3	var udpAddr *net.UDPAddr
 4	udpAddr, _ = net.ResolveUDPAddr(listenType, listenURL)
 5	conn, err := net.ListenUDP(listenType, udpAddr)
 6	if err != nil {
 7		log.Fatal("Error listening: ", err.Error())
 8	}
 9	defer conn.Close()
10	log.Info("Listening on " + listenType + " " + listenURL)
11	for {
12		buf := make([]byte, 256)
13		n, addr, err := conn.ReadFromUDP(buf)
14		if err != nil {
15			log.Info("Error: ", err)
16			continue
17		}
18		go hanlde(conn, addr, buf[0:n])
19	}
20}

下面是处理请求的函数

 1func hanlde(conn *net.UDPConn, addr *net.UDPAddr, data []byte) {
 2	// 这里模仿LoraWAN服务端的方式,200ms后如果校验正确则回复
 3	// 用channel做同步,200ms后触发,如果是耗时的操作,需要做异步处理
 4	done := make(chan bool)
 5	go func() {
 6		time.Sleep(200 * time.Millisecond)
 7		done <- true
 8	}()
 9	// 以16进制格式打印数据
10	var byteStr = ""
11	for _, v := range data {
12		byteStr += fmt.Sprintf("0x%02x ", v)
13	}
14	log.Info("Received ", byteStr, " from ", addr)
15	var resp []byte
16	// 从原始数据获取对象
17	p := model.NewPacket(data[:])
18	if p != nil {
19		jsonStr := p.JSONStr()
20		resp = p.BytesResp()
21		log.Info("JSONStr:",jsonStr)
22		log.Info("Resp:",resp)
23	}
24	//触发
25	<-done 
26        if len(resp) > 0 {
27		_, err := conn.WriteToUDP(resp, addr)
28		if err != nil {
29			log.Info("failed to write data: ", err)
30		}
31	}
32	//关闭channel
33	close(done)
34}

使用Redis和MySQL的部分就不贴出,根据具体的协议处理

comments powered by Disqus