protocol.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package util
  2. //底层网络通信协议
  3. import (
  4. "bufio"
  5. "bytes"
  6. "encoding/binary"
  7. "encoding/json"
  8. "fmt"
  9. "io"
  10. "net"
  11. )
  12. const (
  13. EMPTY_ADDR = "00000000"
  14. )
  15. //读取
  16. type Reader struct {
  17. conn net.Conn
  18. reader *bufio.Reader
  19. buffer [4]byte
  20. }
  21. //数据包,实际上就是协议定义
  22. type Packet struct {
  23. //头开始
  24. Length int32 //4bit 包长度
  25. From string //8bit
  26. To string //8bit
  27. Msgid string //8bit
  28. Event int32 //4bit
  29. SentToType int32 //4bit
  30. //头结束
  31. Raw []byte //数据包,头+数据
  32. }
  33. //返回
  34. func (p *Packet) GetBusinessData() []byte {
  35. return p.Raw[36:]
  36. }
  37. //
  38. type RawData []byte //流数据
  39. //
  40. func NewReader(c net.Conn) *Reader {
  41. return &Reader{
  42. conn: c,
  43. reader: bufio.NewReader(c),
  44. }
  45. }
  46. //读取头部
  47. func (p *Reader) readHeader() (int32, error) {
  48. buf := p.buffer[:4]
  49. if _, err := io.ReadFull(p.reader, buf); err != nil {
  50. return 0, err
  51. }
  52. size := int32(binary.BigEndian.Uint32(buf))
  53. if size < 32 || size > 16384000 {
  54. return 0, fmt.Errorf("Incorrect frame size (%d)", size)
  55. }
  56. return size, nil
  57. }
  58. //读取完整一帧数据,防止粘包
  59. func (p *Reader) readFrame(size int32) (*Packet, error) {
  60. var buf []byte
  61. if int(size) <= len(p.buffer) {
  62. buf = p.buffer[0:size]
  63. } else {
  64. buf = make([]byte, size)
  65. }
  66. _, err := io.ReadFull(p.reader, buf)
  67. raw := append(Int2Byte(size), buf...)
  68. if err == nil {
  69. //解析包
  70. packet := &Packet{
  71. Length: size,
  72. From: string(buf[:8]),
  73. To: string(buf[8:16]),
  74. Msgid: string(buf[16:24]),
  75. Event: Byte2Int(buf[24:28]),
  76. SentToType: Byte2Int(buf[28:32]),
  77. //Data: buf[32:],
  78. Raw: raw,
  79. }
  80. return packet, nil
  81. }
  82. return nil, err
  83. }
  84. //读取数据写入队列
  85. func forwardMessage(c net.Conn, queue chan<- *Packet) {
  86. defer c.Close()
  87. logReader := NewReader(c)
  88. for {
  89. size, err := logReader.readHeader()
  90. if err != nil {
  91. break
  92. }
  93. packet, err := logReader.readFrame(size)
  94. if err != nil {
  95. break
  96. }
  97. queue <- packet
  98. }
  99. }
  100. //从队列中读取数据,定期处理
  101. func processMsg(q <-chan *Packet, parseEvent func(*Packet)) {
  102. for {
  103. select {
  104. case msg := <-q:
  105. go parseEvent(msg)
  106. }
  107. }
  108. }
  109. //封包,输出直接可以写到流中的数据包
  110. func Enpacket(from, to, msgid string, event, sendtotype int, obj interface{}) []byte {
  111. if len(from) != 8 {
  112. from = EMPTY_ADDR
  113. }
  114. if len(to) != 8 {
  115. to = EMPTY_ADDR
  116. }
  117. if len(msgid) != 8 {
  118. msgid = EMPTY_ADDR
  119. }
  120. var ret []byte
  121. var bs []byte
  122. if v, ok := obj.([]byte); ok {
  123. bs = v
  124. } else if v, ok := obj.(string); ok {
  125. bs = []byte(v)
  126. } else {
  127. bs, _ = json.Marshal(obj)
  128. }
  129. ret = append(Int2Byte(int32(32 + len(bs))))
  130. ret = append(ret, []byte(from+to+msgid)...)
  131. ret = append(ret, Int2Byte(int32(event))...)
  132. ret = append(ret, Int2Byte(int32(sendtotype))...)
  133. ret = append(ret, bs...)
  134. return ret
  135. }
  136. //
  137. func Byte2Int(src []byte) int32 {
  138. var ret int32
  139. binary.Read(bytes.NewReader(src), binary.BigEndian, &ret)
  140. return ret
  141. }
  142. //
  143. func Int2Byte(src int32) []byte {
  144. buf := bytes.NewBuffer([]byte{})
  145. binary.Write(buf, binary.BigEndian, src)
  146. return buf.Bytes()
  147. }