123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- package udp
- //底层网络通信协议
- import (
- "bufio"
- "bytes"
- "encoding/binary"
- "encoding/json"
- "fmt"
- "io"
- "net"
- )
- const (
- EMPTY_ADDR = "00000000"
- )
- //读取
- type Reader struct {
- conn net.Conn
- reader *bufio.Reader
- buffer [4]byte
- }
- //数据包,实际上就是协议定义
- type Packet struct {
- //头开始
- Length int32 //4bit 包长度
- From string //8bit
- To string //8bit
- Msgid string //8bit
- Event int32 //4bit
- SentToType int32 //4bit
- //头结束
- Raw []byte //数据包,头+数据
- }
- //返回
- func (p *Packet) GetBusinessData() []byte {
- return p.Raw[36:]
- }
- //
- type RawData []byte //流数据
- //
- func NewReader(c net.Conn) *Reader {
- return &Reader{
- conn: c,
- reader: bufio.NewReader(c),
- }
- }
- //读取头部
- func (p *Reader) readHeader() (int32, error) {
- buf := p.buffer[:4]
- if _, err := io.ReadFull(p.reader, buf); err != nil {
- return 0, err
- }
- size := int32(binary.BigEndian.Uint32(buf))
- if size < 32 || size > 16384000 {
- return 0, fmt.Errorf("Incorrect frame size (%d)", size)
- }
- return size, nil
- }
- //读取完整一帧数据,防止粘包
- func (p *Reader) readFrame(size int32) (*Packet, error) {
- var buf []byte
- if int(size) <= len(p.buffer) {
- buf = p.buffer[0:size]
- } else {
- buf = make([]byte, size)
- }
- _, err := io.ReadFull(p.reader, buf)
- raw := append(Int2Byte(size), buf...)
- if err == nil {
- //解析包
- packet := &Packet{
- Length: size,
- From: string(buf[:8]),
- To: string(buf[8:16]),
- Msgid: string(buf[16:24]),
- Event: Byte2Int(buf[24:28]),
- SentToType: Byte2Int(buf[28:32]),
- //Data: buf[32:],
- Raw: raw,
- }
- return packet, nil
- }
- return nil, err
- }
- //读取数据写入队列
- func forwardMessage(c net.Conn, queue chan<- *Packet) {
- defer c.Close()
- logReader := NewReader(c)
- for {
- size, err := logReader.readHeader()
- if err != nil {
- break
- }
- packet, err := logReader.readFrame(size)
- if err != nil {
- break
- }
- queue <- packet
- }
- }
- //从队列中读取数据,定期处理
- func processMsg(q <-chan *Packet, parseEvent func(*Packet)) {
- for {
- select {
- case msg := <-q:
- go parseEvent(msg)
- }
- }
- }
- //封包,输出直接可以写到流中的数据包
- func Enpacket(from, to, msgid string, event, sendtotype int, obj interface{}) []byte {
- if len(from) != 8 {
- from = EMPTY_ADDR
- }
- if len(to) != 8 {
- to = EMPTY_ADDR
- }
- if len(msgid) != 8 {
- msgid = EMPTY_ADDR
- }
- var ret []byte
- var bs []byte
- if v, ok := obj.([]byte); ok {
- bs = v
- } else if v, ok := obj.(string); ok {
- bs = []byte(v)
- } else {
- bs, _ = json.Marshal(obj)
- }
- ret = append(Int2Byte(int32(32 + len(bs))))
- ret = append(ret, []byte(from+to+msgid)...)
- ret = append(ret, Int2Byte(int32(event))...)
- ret = append(ret, Int2Byte(int32(sendtotype))...)
- ret = append(ret, bs...)
- return ret
- }
- //
- func Byte2Int(src []byte) int32 {
- var ret int32
- binary.Read(bytes.NewReader(src), binary.BigEndian, &ret)
- return ret
- }
- //
- func Int2Byte(src int32) []byte {
- buf := bytes.NewBuffer([]byte{})
- binary.Write(buf, binary.BigEndian, src)
- return buf.Bytes()
- }
|