package util //底层网络通信协议 import ( "bufio" "bytes" "encoding/binary" "encoding/json" "fmt" "io" "net" ) const ( EMPTY_ADDR = "00000000" ) //读取 type Reader struct { conn net.Conn reader *bufio.Reader buffer [4]byte } //数据包,实际上就是协议定义 type Packet struct { //头开始 Length int32 //4bit 包长度 From string //8bit To string //8bit Msgid string //8bit Event int32 //4bit SentToType int32 //4bit //头结束 Raw []byte //数据包,头+数据 } //返回 func (p *Packet) GetBusinessData() []byte { return p.Raw[36:] } // type RawData []byte //流数据 // func NewReader(c net.Conn) *Reader { return &Reader{ conn: c, reader: bufio.NewReader(c), } } //读取头部 func (p *Reader) readHeader() (int32, error) { buf := p.buffer[:4] if _, err := io.ReadFull(p.reader, buf); err != nil { return 0, err } size := int32(binary.BigEndian.Uint32(buf)) if size < 32 || size > 16384000 { return 0, fmt.Errorf("Incorrect frame size (%d)", size) } return size, nil } //读取完整一帧数据,防止粘包 func (p *Reader) readFrame(size int32) (*Packet, error) { var buf []byte if int(size) <= len(p.buffer) { buf = p.buffer[0:size] } else { buf = make([]byte, size) } _, err := io.ReadFull(p.reader, buf) raw := append(Int2Byte(size), buf...) if err == nil { //解析包 packet := &Packet{ Length: size, From: string(buf[:8]), To: string(buf[8:16]), Msgid: string(buf[16:24]), Event: Byte2Int(buf[24:28]), SentToType: Byte2Int(buf[28:32]), //Data: buf[32:], Raw: raw, } return packet, nil } return nil, err } //读取数据写入队列 func forwardMessage(c net.Conn, queue chan<- *Packet) { defer c.Close() logReader := NewReader(c) for { size, err := logReader.readHeader() if err != nil { break } packet, err := logReader.readFrame(size) if err != nil { break } queue <- packet } } //从队列中读取数据,定期处理 func processMsg(q <-chan *Packet, parseEvent func(*Packet)) { for { select { case msg := <-q: go parseEvent(msg) } } } //封包,输出直接可以写到流中的数据包 func Enpacket(from, to, msgid string, event, sendtotype int, obj interface{}) []byte { if len(from) != 8 { from = EMPTY_ADDR } if len(to) != 8 { to = EMPTY_ADDR } if len(msgid) != 8 { msgid = EMPTY_ADDR } var ret []byte var bs []byte if v, ok := obj.([]byte); ok { bs = v } else if v, ok := obj.(string); ok { bs = []byte(v) } else { bs, _ = json.Marshal(obj) } ret = append(Int2Byte(int32(32 + len(bs)))) ret = append(ret, []byte(from+to+msgid)...) ret = append(ret, Int2Byte(int32(event))...) ret = append(ret, Int2Byte(int32(sendtotype))...) ret = append(ret, bs...) return ret } // func Byte2Int(src []byte) int32 { var ret int32 binary.Read(bytes.NewReader(src), binary.BigEndian, &ret) return ret } // func Int2Byte(src int32) []byte { buf := bytes.NewBuffer([]byte{}) binary.Write(buf, binary.BigEndian, src) return buf.Bytes() }