//客户端封装,支持短线重连接,支持客户端检测 package mfw import ( "encoding/binary" "encoding/json" "errors" "fmt" "io" "math" "os" "sync" "time" ) var IsHiddenLog bool //封装,可以链接多个消息总线 type Client struct { conn *TCPClient myid string // oldmyid string businessEventHandler func(*Packet) canHandleEvent []int lastCheckHeart int64 //最后一次心跳检测 messageQueue chan *Packet writeQueue chan RawData resultlock sync.Map //map[string]chan []byte myname string OnConnectSuccess func() } //客户端配置 type ClientConfig struct { ClientName string //名称 EventHandler func(p *Packet) //事件处理函数 MsgServerAddr string //消息总线地址 CanHandleEvents []int //我可以处理的事件列表 OnRequestConnect func() //请求重连的事件处理函数 OnConnectSuccess func() //重连成功的时间处理函数 ReadBufferSize int //读缓冲区大小 WriteBufferSize int //写缓冲区大小 } // func printlog(p ...interface{}) { if !IsHiddenLog { fmt.Print(p...) } } // func (client *Client) WriteObj(to, msgid string, event, sendtotype int, obj interface{}) { client.writeQueue <- RawData(Enpacket(client.myid, to, msgid, event, sendtotype, obj)) } //同步调用方法 func (client *Client) Call(to, msgid string, event, sendtotype int, obj interface{}, calltimeout int64) ([]byte, error) { client.resultlock.Store(msgid, make(chan []byte)) client.writeQueue <- RawData(Enpacket(client.myid, to, msgid, event, sendtotype, obj)) tmp, _ := client.resultlock.Load(msgid) lock := tmp.(chan []byte) select { case <-time.After(time.Duration(calltimeout) * time.Second): client.resultlock.Delete(msgid) return nil, errors.New("timeout") case ret := <-lock: client.resultlock.Delete(msgid) return ret, nil } } func (client *Client) GetMyclient() string { return client.myid } //底层事件处理 func (client *Client) baseEventHandle(msg *Packet) { printlog(".") event := int(msg.Event) switch event { case EVENT_RETURN_MACHINE_ID: //服务端分配id,发布我能处理的时间 printlog("*", string(msg.GetBusinessData())) client.myid = string(msg.GetBusinessData()) client.conn.MyId = client.myid if client.oldmyid != "" { client.WriteObj("", "", EVENT_REMOVE_CLIENT, SENDTO_TYPE_P2P, []byte(client.oldmyid)) client.oldmyid = "" } bs := make([]byte, 0) for _, v := range client.canHandleEvent { bs = append(bs, Int2Byte(int32(v))...) } client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, bs) client.WriteObj("", "", EVENT_UPDATE_MYNAME, SENDTO_TYPE_P2P, []byte(client.myname)) if client.OnConnectSuccess != nil { //调用成功回调 go client.OnConnectSuccess() } case EVENT_REQUEST_HEARTBEAT: //请求的心跳,回应心跳请求 client.WriteObj("", "", EVENT_RETURN_HEARTBEAT, SENDTO_TYPE_P2P, msg.GetBusinessData()) client.lastCheckHeart = time.Now().Unix() case EVENT_SYSTEM_COMMAND: //系统控制指令 go processSysCommand(client, msg.GetBusinessData()) case EVENT_RECIVE_CALLBACK: //回调处理 if v, ok := client.resultlock.Load(msg.Msgid); ok { tmp := v.(chan []byte) tmp <- msg.GetBusinessData() return } fallthrough default: //业务处理 go client.businessEventHandler(msg) } } //从套接字读取数据 func (client *Client) read4socket() (*Packet, error) { defer Catch() buffer := make([]byte, 4) if _, err := io.ReadFull(client.conn, buffer); err != nil { printlog("?_readFullerr", err) return nil, errors.New("read packet error") } size := int32(binary.BigEndian.Uint32(buffer)) if size < 32 || size > 16384000 { printlog("?_sizeerror[", size, "]") return nil, errors.New("read packet error") } var buf []byte = make([]byte, size) readlen, err := io.ReadFull(client.conn, buf) if err != nil || int32(readlen) != size { printlog("?_readpacketerror", err) return nil, errors.New("read packet error") } // raw := append(Int2Byte(size), buf...) //解析包 packet := &Packet{Length: size, From: string(buf[:8]), To: string(buf[8:16]), Msgid: string(buf[16:24]), Event: Byte2Int(buf[24:28]), SentToType: Byte2Int(buf[28:32]), Raw: raw, } return packet, nil } //读数据到缓冲区 func (client *Client) readDump(queue chan<- *Packet) { defer Catch() for { if p, err := client.read4socket(); err == nil && p != nil { client.lastCheckHeart = time.Now().Unix() queue <- p } else { time.Sleep(4 * time.Second) } } } //写数据到套接字 func (client *Client) write2socket(msg RawData) (ret bool) { defer Catch() _, err := client.conn.Write([]byte(msg)) if err == nil { client.lastCheckHeart = time.Now().Unix() return true } return false } //写数据到缓冲区 func (client *Client) writeDump(p <-chan RawData) { defer Catch() for { select { case msg := <-p: //写数据,直到写入成功 client.write2socket(msg) } } } //启动客户端 //@Deprecated 此方法已过期 func StartClient(fn func(*Packet), addr string, myname string, handleevent []int) (*Client, error) { // client := &Client{} client.resultlock = sync.Map{} //make(map[string]chan []byte) client.businessEventHandler = fn client.canHandleEvent = handleevent client.myname = myname client.messageQueue = make(chan *Packet, 500) //接受消息 client.writeQueue = make(chan RawData, 500) //写入并发 go func(q <-chan *Packet, parseEvent func(*Packet)) { for { select { case msg := <-q: go parseEvent(msg) } } }(client.messageQueue, client.baseEventHandle) go client.writeDump(client.writeQueue) //创建链接 var err error for client.conn, err = Dial("tcp4", addr); err != nil; client.conn, err = Dial("tcp4", addr) { printlog("?") time.Sleep(5 * time.Second) } client.conn.SetMaxRetries(math.MaxInt32) //重试次数 client.conn.SetRetryInterval(2 * time.Second) //重试间隔30秒 client.conn.OnRequestConnect = func() { client.oldmyid = client.myid client.myid = "" } go client.readDump(client.messageQueue) return client, nil } //解析处理控制指令 func processSysCommand(client *Client, data []byte) { tmp := map[string]interface{}{} err := json.Unmarshal(data, &tmp) if err != nil { return } cmd := tmp["act"].(string) switch cmd { case "pass": //暂停服务 t := int(tmp["passtime"].(float64)) client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, []byte{}) time.Sleep(time.Duration(t) * time.Second) bs := make([]byte, 0) for _, v := range client.canHandleEvent { bs = append(bs, Int2Byte(int32(v))...) } client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, bs) case "reconnect": //重新连接 case "quit": //退出 client.WriteObj("", "", EVENT_BYE, SENDTO_TYPE_P2P, []byte{}) os.Exit(0) } } //启动客户端 func NewClient(conf *ClientConfig) (*Client, error) { if conf.MsgServerAddr == "" { return nil, errors.New("配置项缺失(MsgServerAddr)") } else if conf.EventHandler == nil { return nil, errors.New("配置项缺失(EventHandler)") } if conf.ReadBufferSize == 0 { conf.ReadBufferSize = 200 } if conf.WriteBufferSize == 0 { conf.WriteBufferSize = 150 } client := &Client{} client.resultlock = sync.Map{} // make(map[string]chan []byte) client.businessEventHandler = conf.EventHandler client.canHandleEvent = conf.CanHandleEvents client.myname = conf.ClientName client.messageQueue = make(chan *Packet, conf.ReadBufferSize) //接受消息 client.writeQueue = make(chan RawData, conf.WriteBufferSize) //写入并发 go func(q <-chan *Packet, parseEvent func(*Packet)) { for { select { case msg := <-q: go parseEvent(msg) } } }(client.messageQueue, client.baseEventHandle) go client.writeDump(client.writeQueue) //创建链接 var err error for client.conn, err = Dial("tcp4", conf.MsgServerAddr); err != nil; client.conn, err = Dial("tcp4", conf.MsgServerAddr) { printlog("?") time.Sleep(5 * time.Second) } client.conn.SetMaxRetries(math.MaxInt32) //重试次数 client.conn.SetRetryInterval(2 * time.Second) //重试间隔30秒 //事件绑定 client.conn.OnRequestConnect = func() { client.oldmyid = client.myid client.myid = "" if conf.OnRequestConnect != nil { conf.OnRequestConnect() } } client.conn.OnConnectSuccess = func() { if conf.OnConnectSuccess != nil { conf.OnConnectSuccess() } } go client.readDump(client.messageQueue) return client, nil }