123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- //客户端封装,支持短线重连接,支持客户端检测
- package mfw
- import (
- "encoding/binary"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "math"
- "os"
- "sync"
- "time"
- )
- var IsHiddenLog bool
- //封装,可以链接多个消息总线
- type Client struct {
- conn *TCPClient
- myid string //
- oldmyid string
- businessEventHandler func(*Packet)
- canHandleEvent []int
- lastCheckHeart int64 //最后一次心跳检测
- messageQueue chan *Packet
- writeQueue chan RawData
- resultlock sync.Map //map[string]chan []byte
- myname string
- OnConnectSuccess func()
- }
- //客户端配置
- type ClientConfig struct {
- ClientName string //名称
- EventHandler func(p *Packet) //事件处理函数
- MsgServerAddr string //消息总线地址
- CanHandleEvents []int //我可以处理的事件列表
- OnRequestConnect func() //请求重连的事件处理函数
- OnConnectSuccess func() //重连成功的时间处理函数
- ReadBufferSize int //读缓冲区大小
- WriteBufferSize int //写缓冲区大小
- }
- //
- func printlog(p ...interface{}) {
- if !IsHiddenLog {
- fmt.Print(p...)
- }
- }
- //
- func (client *Client) WriteObj(to, msgid string, event, sendtotype int, obj interface{}) {
- client.writeQueue <- RawData(Enpacket(client.myid, to, msgid, event, sendtotype, obj))
- }
- //同步调用方法
- func (client *Client) Call(to, msgid string, event, sendtotype int, obj interface{}, calltimeout int64) ([]byte, error) {
- client.resultlock.Store(msgid, make(chan []byte))
- client.writeQueue <- RawData(Enpacket(client.myid, to, msgid, event, sendtotype, obj))
- tmp, _ := client.resultlock.Load(msgid)
- lock := tmp.(chan []byte)
- select {
- case <-time.After(time.Duration(calltimeout) * time.Second):
- client.resultlock.Delete(msgid)
- return nil, errors.New("timeout")
- case ret := <-lock:
- client.resultlock.Delete(msgid)
- return ret, nil
- }
- }
- func (client *Client) GetMyclient() string {
- return client.myid
- }
- //底层事件处理
- func (client *Client) baseEventHandle(msg *Packet) {
- printlog(".")
- event := int(msg.Event)
- switch event {
- case EVENT_RETURN_MACHINE_ID: //服务端分配id,发布我能处理的时间
- printlog("*", string(msg.GetBusinessData()))
- client.myid = string(msg.GetBusinessData())
- client.conn.MyId = client.myid
- if client.oldmyid != "" {
- client.WriteObj("", "", EVENT_REMOVE_CLIENT, SENDTO_TYPE_P2P, []byte(client.oldmyid))
- client.oldmyid = ""
- }
- bs := make([]byte, 0)
- for _, v := range client.canHandleEvent {
- bs = append(bs, Int2Byte(int32(v))...)
- }
- client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, bs)
- client.WriteObj("", "", EVENT_UPDATE_MYNAME, SENDTO_TYPE_P2P, []byte(client.myname))
- if client.OnConnectSuccess != nil { //调用成功回调
- go client.OnConnectSuccess()
- }
- case EVENT_REQUEST_HEARTBEAT: //请求的心跳,回应心跳请求
- client.WriteObj("", "", EVENT_RETURN_HEARTBEAT, SENDTO_TYPE_P2P, msg.GetBusinessData())
- client.lastCheckHeart = time.Now().Unix()
- case EVENT_SYSTEM_COMMAND: //系统控制指令
- go processSysCommand(client, msg.GetBusinessData())
- case EVENT_RECIVE_CALLBACK: //回调处理
- if v, ok := client.resultlock.Load(msg.Msgid); ok {
- tmp := v.(chan []byte)
- tmp <- msg.GetBusinessData()
- return
- }
- fallthrough
- default: //业务处理
- go client.businessEventHandler(msg)
- }
- }
- //从套接字读取数据
- func (client *Client) read4socket() (*Packet, error) {
- defer Catch()
- buffer := make([]byte, 4)
- if _, err := io.ReadFull(client.conn, buffer); err != nil {
- printlog("?_readFullerr", err)
- return nil, errors.New("read packet error")
- }
- size := int32(binary.BigEndian.Uint32(buffer))
- if size < 32 || size > 16384000 {
- printlog("?_sizeerror[", size, "]")
- return nil, errors.New("read packet error")
- }
- var buf []byte = make([]byte, size)
- readlen, err := io.ReadFull(client.conn, buf)
- if err != nil || int32(readlen) != size {
- printlog("?_readpacketerror", err)
- return nil, errors.New("read packet error")
- }
- //
- raw := append(Int2Byte(size), buf...)
- //解析包
- packet := &Packet{Length: size,
- From: string(buf[:8]),
- To: string(buf[8:16]),
- Msgid: string(buf[16:24]),
- Event: Byte2Int(buf[24:28]),
- SentToType: Byte2Int(buf[28:32]),
- Raw: raw,
- }
- return packet, nil
- }
- //读数据到缓冲区
- func (client *Client) readDump(queue chan<- *Packet) {
- defer Catch()
- for {
- if p, err := client.read4socket(); err == nil && p != nil {
- client.lastCheckHeart = time.Now().Unix()
- queue <- p
- } else {
- time.Sleep(4 * time.Second)
- }
- }
- }
- //写数据到套接字
- func (client *Client) write2socket(msg RawData) (ret bool) {
- defer Catch()
- _, err := client.conn.Write([]byte(msg))
- if err == nil {
- client.lastCheckHeart = time.Now().Unix()
- return true
- }
- return false
- }
- //写数据到缓冲区
- func (client *Client) writeDump(p <-chan RawData) {
- defer Catch()
- for {
- select {
- case msg := <-p:
- //写数据,直到写入成功
- client.write2socket(msg)
- }
- }
- }
- //启动客户端
- //@Deprecated 此方法已过期
- func StartClient(fn func(*Packet),
- addr string, myname string,
- handleevent []int) (*Client, error) {
- //
- client := &Client{}
- client.resultlock = sync.Map{} //make(map[string]chan []byte)
- client.businessEventHandler = fn
- client.canHandleEvent = handleevent
- client.myname = myname
- client.messageQueue = make(chan *Packet, 500) //接受消息
- client.writeQueue = make(chan RawData, 500) //写入并发
- go func(q <-chan *Packet, parseEvent func(*Packet)) {
- for {
- select {
- case msg := <-q:
- go parseEvent(msg)
- }
- }
- }(client.messageQueue, client.baseEventHandle)
- go client.writeDump(client.writeQueue)
- //创建链接
- var err error
- for client.conn, err = Dial("tcp4", addr); err != nil; client.conn, err = Dial("tcp4", addr) {
- printlog("?")
- time.Sleep(5 * time.Second)
- }
- client.conn.SetMaxRetries(math.MaxInt32) //重试次数
- client.conn.SetRetryInterval(2 * time.Second) //重试间隔30秒
- client.conn.OnRequestConnect = func() {
- client.oldmyid = client.myid
- client.myid = ""
- }
- go client.readDump(client.messageQueue)
- return client, nil
- }
- //解析处理控制指令
- func processSysCommand(client *Client, data []byte) {
- tmp := map[string]interface{}{}
- err := json.Unmarshal(data, &tmp)
- if err != nil {
- return
- }
- cmd := tmp["act"].(string)
- switch cmd {
- case "pass": //暂停服务
- t := int(tmp["passtime"].(float64))
- client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, []byte{})
- time.Sleep(time.Duration(t) * time.Second)
- bs := make([]byte, 0)
- for _, v := range client.canHandleEvent {
- bs = append(bs, Int2Byte(int32(v))...)
- }
- client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, bs)
- case "reconnect": //重新连接
- case "quit": //退出
- client.WriteObj("", "", EVENT_BYE, SENDTO_TYPE_P2P, []byte{})
- os.Exit(0)
- }
- }
- //启动客户端
- func NewClient(conf *ClientConfig) (*Client, error) {
- if conf.MsgServerAddr == "" {
- return nil, errors.New("配置项缺失(MsgServerAddr)")
- } else if conf.EventHandler == nil {
- return nil, errors.New("配置项缺失(EventHandler)")
- }
- if conf.ReadBufferSize == 0 {
- conf.ReadBufferSize = 200
- }
- if conf.WriteBufferSize == 0 {
- conf.WriteBufferSize = 150
- }
- client := &Client{}
- client.resultlock = sync.Map{} // make(map[string]chan []byte)
- client.businessEventHandler = conf.EventHandler
- client.canHandleEvent = conf.CanHandleEvents
- client.myname = conf.ClientName
- client.messageQueue = make(chan *Packet, conf.ReadBufferSize) //接受消息
- client.writeQueue = make(chan RawData, conf.WriteBufferSize) //写入并发
- go func(q <-chan *Packet, parseEvent func(*Packet)) {
- for {
- select {
- case msg := <-q:
- go parseEvent(msg)
- }
- }
- }(client.messageQueue, client.baseEventHandle)
- go client.writeDump(client.writeQueue)
- //创建链接
- var err error
- for client.conn, err = Dial("tcp4", conf.MsgServerAddr); err != nil; client.conn, err = Dial("tcp4", conf.MsgServerAddr) {
- printlog("?")
- time.Sleep(5 * time.Second)
- }
- client.conn.SetMaxRetries(math.MaxInt32) //重试次数
- client.conn.SetRetryInterval(2 * time.Second) //重试间隔30秒
- //事件绑定
- client.conn.OnRequestConnect = func() {
- client.oldmyid = client.myid
- client.myid = ""
- if conf.OnRequestConnect != nil {
- conf.OnRequestConnect()
- }
- }
- client.conn.OnConnectSuccess = func() {
- if conf.OnConnectSuccess != nil {
- conf.OnConnectSuccess()
- }
- }
- go client.readDump(client.messageQueue)
- return client, nil
- }
|