client.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. //客户端封装,支持短线重连接,支持客户端检测
  2. package mfw
  3. import (
  4. "encoding/binary"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "math"
  10. "os"
  11. "sync"
  12. "time"
  13. )
  14. var IsHiddenLog bool
  15. //封装,可以链接多个消息总线
  16. type Client struct {
  17. conn *TCPClient
  18. myid string //
  19. oldmyid string
  20. businessEventHandler func(*Packet)
  21. canHandleEvent []int
  22. lastCheckHeart int64 //最后一次心跳检测
  23. messageQueue chan *Packet
  24. writeQueue chan RawData
  25. resultlock sync.Map //map[string]chan []byte
  26. myname string
  27. OnConnectSuccess func()
  28. }
  29. //客户端配置
  30. type ClientConfig struct {
  31. ClientName string //名称
  32. EventHandler func(p *Packet) //事件处理函数
  33. MsgServerAddr string //消息总线地址
  34. CanHandleEvents []int //我可以处理的事件列表
  35. OnRequestConnect func() //请求重连的事件处理函数
  36. OnConnectSuccess func() //重连成功的时间处理函数
  37. ReadBufferSize int //读缓冲区大小
  38. WriteBufferSize int //写缓冲区大小
  39. }
  40. //
  41. func printlog(p ...interface{}) {
  42. if !IsHiddenLog {
  43. fmt.Print(p...)
  44. }
  45. }
  46. //
  47. func (client *Client) WriteObj(to, msgid string, event, sendtotype int, obj interface{}) {
  48. client.writeQueue <- RawData(Enpacket(client.myid, to, msgid, event, sendtotype, obj))
  49. }
  50. //同步调用方法
  51. func (client *Client) Call(to, msgid string, event, sendtotype int, obj interface{}, calltimeout int64) ([]byte, error) {
  52. client.resultlock.Store(msgid, make(chan []byte))
  53. client.writeQueue <- RawData(Enpacket(client.myid, to, msgid, event, sendtotype, obj))
  54. tmp, _ := client.resultlock.Load(msgid)
  55. lock := tmp.(chan []byte)
  56. select {
  57. case <-time.After(time.Duration(calltimeout) * time.Second):
  58. client.resultlock.Delete(msgid)
  59. return nil, errors.New("timeout")
  60. case ret := <-lock:
  61. client.resultlock.Delete(msgid)
  62. return ret, nil
  63. }
  64. }
  65. func (client *Client) GetMyclient() string {
  66. return client.myid
  67. }
  68. //底层事件处理
  69. func (client *Client) baseEventHandle(msg *Packet) {
  70. printlog(".")
  71. event := int(msg.Event)
  72. switch event {
  73. case EVENT_RETURN_MACHINE_ID: //服务端分配id,发布我能处理的时间
  74. printlog("*", string(msg.GetBusinessData()))
  75. client.myid = string(msg.GetBusinessData())
  76. client.conn.MyId = client.myid
  77. if client.oldmyid != "" {
  78. client.WriteObj("", "", EVENT_REMOVE_CLIENT, SENDTO_TYPE_P2P, []byte(client.oldmyid))
  79. client.oldmyid = ""
  80. }
  81. bs := make([]byte, 0)
  82. for _, v := range client.canHandleEvent {
  83. bs = append(bs, Int2Byte(int32(v))...)
  84. }
  85. client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, bs)
  86. client.WriteObj("", "", EVENT_UPDATE_MYNAME, SENDTO_TYPE_P2P, []byte(client.myname))
  87. if client.OnConnectSuccess != nil { //调用成功回调
  88. go client.OnConnectSuccess()
  89. }
  90. case EVENT_REQUEST_HEARTBEAT: //请求的心跳,回应心跳请求
  91. client.WriteObj("", "", EVENT_RETURN_HEARTBEAT, SENDTO_TYPE_P2P, msg.GetBusinessData())
  92. client.lastCheckHeart = time.Now().Unix()
  93. case EVENT_SYSTEM_COMMAND: //系统控制指令
  94. go processSysCommand(client, msg.GetBusinessData())
  95. case EVENT_RECIVE_CALLBACK: //回调处理
  96. if v, ok := client.resultlock.Load(msg.Msgid); ok {
  97. tmp := v.(chan []byte)
  98. tmp <- msg.GetBusinessData()
  99. return
  100. }
  101. fallthrough
  102. default: //业务处理
  103. go client.businessEventHandler(msg)
  104. }
  105. }
  106. //从套接字读取数据
  107. func (client *Client) read4socket() (*Packet, error) {
  108. defer Catch()
  109. buffer := make([]byte, 4)
  110. if _, err := io.ReadFull(client.conn, buffer); err != nil {
  111. printlog("?_readFullerr", err)
  112. return nil, errors.New("read packet error")
  113. }
  114. size := int32(binary.BigEndian.Uint32(buffer))
  115. if size < 32 || size > 16384000 {
  116. printlog("?_sizeerror[", size, "]")
  117. return nil, errors.New("read packet error")
  118. }
  119. var buf []byte = make([]byte, size)
  120. readlen, err := io.ReadFull(client.conn, buf)
  121. if err != nil || int32(readlen) != size {
  122. printlog("?_readpacketerror", err)
  123. return nil, errors.New("read packet error")
  124. }
  125. //
  126. raw := append(Int2Byte(size), buf...)
  127. //解析包
  128. packet := &Packet{Length: size,
  129. From: string(buf[:8]),
  130. To: string(buf[8:16]),
  131. Msgid: string(buf[16:24]),
  132. Event: Byte2Int(buf[24:28]),
  133. SentToType: Byte2Int(buf[28:32]),
  134. Raw: raw,
  135. }
  136. return packet, nil
  137. }
  138. //读数据到缓冲区
  139. func (client *Client) readDump(queue chan<- *Packet) {
  140. defer Catch()
  141. for {
  142. if p, err := client.read4socket(); err == nil && p != nil {
  143. client.lastCheckHeart = time.Now().Unix()
  144. queue <- p
  145. } else {
  146. time.Sleep(4 * time.Second)
  147. }
  148. }
  149. }
  150. //写数据到套接字
  151. func (client *Client) write2socket(msg RawData) (ret bool) {
  152. defer Catch()
  153. _, err := client.conn.Write([]byte(msg))
  154. if err == nil {
  155. client.lastCheckHeart = time.Now().Unix()
  156. return true
  157. }
  158. return false
  159. }
  160. //写数据到缓冲区
  161. func (client *Client) writeDump(p <-chan RawData) {
  162. defer Catch()
  163. for {
  164. select {
  165. case msg := <-p:
  166. //写数据,直到写入成功
  167. client.write2socket(msg)
  168. }
  169. }
  170. }
  171. //启动客户端
  172. //@Deprecated 此方法已过期
  173. func StartClient(fn func(*Packet),
  174. addr string, myname string,
  175. handleevent []int) (*Client, error) {
  176. //
  177. client := &Client{}
  178. client.resultlock = sync.Map{} //make(map[string]chan []byte)
  179. client.businessEventHandler = fn
  180. client.canHandleEvent = handleevent
  181. client.myname = myname
  182. client.messageQueue = make(chan *Packet, 500) //接受消息
  183. client.writeQueue = make(chan RawData, 500) //写入并发
  184. go func(q <-chan *Packet, parseEvent func(*Packet)) {
  185. for {
  186. select {
  187. case msg := <-q:
  188. go parseEvent(msg)
  189. }
  190. }
  191. }(client.messageQueue, client.baseEventHandle)
  192. go client.writeDump(client.writeQueue)
  193. //创建链接
  194. var err error
  195. for client.conn, err = Dial("tcp4", addr); err != nil; client.conn, err = Dial("tcp4", addr) {
  196. printlog("?")
  197. time.Sleep(5 * time.Second)
  198. }
  199. client.conn.SetMaxRetries(math.MaxInt32) //重试次数
  200. client.conn.SetRetryInterval(2 * time.Second) //重试间隔30秒
  201. client.conn.OnRequestConnect = func() {
  202. client.oldmyid = client.myid
  203. client.myid = ""
  204. }
  205. go client.readDump(client.messageQueue)
  206. return client, nil
  207. }
  208. //解析处理控制指令
  209. func processSysCommand(client *Client, data []byte) {
  210. tmp := map[string]interface{}{}
  211. err := json.Unmarshal(data, &tmp)
  212. if err != nil {
  213. return
  214. }
  215. cmd := tmp["act"].(string)
  216. switch cmd {
  217. case "pass": //暂停服务
  218. t := int(tmp["passtime"].(float64))
  219. client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, []byte{})
  220. time.Sleep(time.Duration(t) * time.Second)
  221. bs := make([]byte, 0)
  222. for _, v := range client.canHandleEvent {
  223. bs = append(bs, Int2Byte(int32(v))...)
  224. }
  225. client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, bs)
  226. case "reconnect": //重新连接
  227. case "quit": //退出
  228. client.WriteObj("", "", EVENT_BYE, SENDTO_TYPE_P2P, []byte{})
  229. os.Exit(0)
  230. }
  231. }
  232. //启动客户端
  233. func NewClient(conf *ClientConfig) (*Client, error) {
  234. if conf.MsgServerAddr == "" {
  235. return nil, errors.New("配置项缺失(MsgServerAddr)")
  236. } else if conf.EventHandler == nil {
  237. return nil, errors.New("配置项缺失(EventHandler)")
  238. }
  239. if conf.ReadBufferSize == 0 {
  240. conf.ReadBufferSize = 200
  241. }
  242. if conf.WriteBufferSize == 0 {
  243. conf.WriteBufferSize = 150
  244. }
  245. client := &Client{}
  246. client.resultlock = sync.Map{} // make(map[string]chan []byte)
  247. client.businessEventHandler = conf.EventHandler
  248. client.canHandleEvent = conf.CanHandleEvents
  249. client.myname = conf.ClientName
  250. client.messageQueue = make(chan *Packet, conf.ReadBufferSize) //接受消息
  251. client.writeQueue = make(chan RawData, conf.WriteBufferSize) //写入并发
  252. go func(q <-chan *Packet, parseEvent func(*Packet)) {
  253. for {
  254. select {
  255. case msg := <-q:
  256. go parseEvent(msg)
  257. }
  258. }
  259. }(client.messageQueue, client.baseEventHandle)
  260. go client.writeDump(client.writeQueue)
  261. //创建链接
  262. var err error
  263. for client.conn, err = Dial("tcp4", conf.MsgServerAddr); err != nil; client.conn, err = Dial("tcp4", conf.MsgServerAddr) {
  264. printlog("?")
  265. time.Sleep(5 * time.Second)
  266. }
  267. client.conn.SetMaxRetries(math.MaxInt32) //重试次数
  268. client.conn.SetRetryInterval(2 * time.Second) //重试间隔30秒
  269. //事件绑定
  270. client.conn.OnRequestConnect = func() {
  271. client.oldmyid = client.myid
  272. client.myid = ""
  273. if conf.OnRequestConnect != nil {
  274. conf.OnRequestConnect()
  275. }
  276. }
  277. client.conn.OnConnectSuccess = func() {
  278. if conf.OnConnectSuccess != nil {
  279. conf.OnConnectSuccess()
  280. }
  281. }
  282. go client.readDump(client.messageQueue)
  283. return client, nil
  284. }