|
@@ -1,52 +1,122 @@
|
|
-//客户端封装,支持短线重连接,支持客户端检测
|
|
|
|
|
|
+//客户端封装,支持断线重连接,支持客户端检测
|
|
package util
|
|
package util
|
|
|
|
|
|
import (
|
|
import (
|
|
|
|
+ "crypto/tls"
|
|
"encoding/binary"
|
|
"encoding/binary"
|
|
- "encoding/json"
|
|
|
|
"errors"
|
|
"errors"
|
|
- "fmt"
|
|
|
|
"io"
|
|
"io"
|
|
"math"
|
|
"math"
|
|
- "os"
|
|
|
|
"sync"
|
|
"sync"
|
|
"time"
|
|
"time"
|
|
)
|
|
)
|
|
|
|
|
|
-var IsHiddenLog bool
|
|
|
|
|
|
+//
|
|
|
|
+type CustomConn interface {
|
|
|
|
+ io.Reader
|
|
|
|
+ io.Writer
|
|
|
|
+ io.Closer
|
|
|
|
+ Dial() error
|
|
|
|
+}
|
|
|
|
|
|
//封装,可以链接多个消息总线
|
|
//封装,可以链接多个消息总线
|
|
type Client struct {
|
|
type Client struct {
|
|
- conn *TCPClient
|
|
|
|
|
|
+ conn CustomConn
|
|
myid string //
|
|
myid string //
|
|
- oldmyid string
|
|
|
|
businessEventHandler func(*Packet)
|
|
businessEventHandler func(*Packet)
|
|
canHandleEvent []int
|
|
canHandleEvent []int
|
|
lastCheckHeart int64 //最后一次心跳检测
|
|
lastCheckHeart int64 //最后一次心跳检测
|
|
messageQueue chan *Packet
|
|
messageQueue chan *Packet
|
|
writeQueue chan RawData
|
|
writeQueue chan RawData
|
|
resultlock sync.Map //map[string]chan []byte
|
|
resultlock sync.Map //map[string]chan []byte
|
|
- myname string
|
|
|
|
- OnConnectSuccess func()
|
|
|
|
}
|
|
}
|
|
|
|
|
|
//客户端配置
|
|
//客户端配置
|
|
type ClientConfig struct {
|
|
type ClientConfig struct {
|
|
- ClientName string //名称
|
|
|
|
- EventHandler func(p *Packet) //事件处理函数
|
|
|
|
- MsgServerAddr string //消息总线地址
|
|
|
|
- CanHandleEvents []int //我可以处理的事件列表
|
|
|
|
- OnRequestConnect func() //请求重连的事件处理函数
|
|
|
|
- OnConnectSuccess func() //重连成功的时间处理函数
|
|
|
|
- ReadBufferSize int //读缓冲区大小
|
|
|
|
- WriteBufferSize int //写缓冲区大小
|
|
|
|
|
|
+ ClientName string //名称
|
|
|
|
+ EventHandler func(p *Packet) //事件处理函数
|
|
|
|
+ MsgServerAddr string //消息总线地址
|
|
|
|
+ CanHandleEvents []int //我可以处理的事件列表
|
|
|
|
+ ReadBufferSize int //读缓冲区大小
|
|
|
|
+ WriteBufferSize int //写缓冲区大小
|
|
|
|
+ UseTls bool //是否使用tls
|
|
|
|
+ TlsCertFile string
|
|
|
|
+ TlsKeyFile string
|
|
}
|
|
}
|
|
|
|
|
|
-//
|
|
|
|
-func printlog(p ...interface{}) {
|
|
|
|
- if !IsHiddenLog {
|
|
|
|
- fmt.Print(p...)
|
|
|
|
|
|
+//启动客户端
|
|
|
|
+func NewClient(conf *ClientConfig) (*Client, error) {
|
|
|
|
+ if conf.MsgServerAddr == "" {
|
|
|
|
+ return nil, errors.New("配置项缺失(MsgServerAddr)")
|
|
|
|
+ } else if conf.EventHandler == nil {
|
|
|
|
+ return nil, errors.New("配置项缺失(EventHandler)")
|
|
|
|
+ }
|
|
|
|
+ if conf.ReadBufferSize == 0 {
|
|
|
|
+ conf.ReadBufferSize = 200
|
|
|
|
+ }
|
|
|
|
+ if conf.WriteBufferSize == 0 {
|
|
|
|
+ conf.WriteBufferSize = 150
|
|
|
|
+ }
|
|
|
|
+ client := &Client{}
|
|
|
|
+ client.resultlock = sync.Map{}
|
|
|
|
+ client.businessEventHandler = conf.EventHandler
|
|
|
|
+ client.canHandleEvent = conf.CanHandleEvents
|
|
|
|
+ client.messageQueue = make(chan *Packet, conf.ReadBufferSize) //接受消息
|
|
|
|
+ client.writeQueue = make(chan RawData, conf.WriteBufferSize) //写入并发
|
|
|
|
+ client.myid = UUID(8)
|
|
|
|
+ //消息读取泵
|
|
|
|
+ go func(q <-chan *Packet, parseEvent func(*Packet)) {
|
|
|
|
+ for {
|
|
|
|
+ select {
|
|
|
|
+ case msg := <-q:
|
|
|
|
+ go parseEvent(msg)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }(client.messageQueue, client.baseEventHandle)
|
|
|
|
+ //消息写入泵
|
|
|
|
+ go client.writeDump(client.writeQueue)
|
|
|
|
+ //创建链接
|
|
|
|
+ var connectedcallbackfn = func() {
|
|
|
|
+ client.WriteObj("", "", EVENT_REQUEST_JOIN, SENDTO_TYPE_P2P, map[string]interface{}{
|
|
|
|
+ "handler": conf.CanHandleEvents,
|
|
|
|
+ "name": conf.ClientName,
|
|
|
|
+ "myid": client.myid,
|
|
|
|
+ })
|
|
|
|
+ }
|
|
|
|
+ //使用tls
|
|
|
|
+ if conf.UseTls {
|
|
|
|
+ cert, err := tls.LoadX509KeyPair(conf.TlsCertFile, conf.TlsKeyFile)
|
|
|
|
+ if err != nil {
|
|
|
|
+ Log.Fail.Fatalln(err)
|
|
|
|
+ }
|
|
|
|
+ config := &tls.Config{
|
|
|
|
+ InsecureSkipVerify: true,
|
|
|
|
+ Certificates: []tls.Certificate{cert},
|
|
|
|
+ }
|
|
|
|
+ client.conn = NewTlsClient("tcp4", conf.MsgServerAddr,
|
|
|
|
+ math.MaxInt32, 2*time.Second, config, connectedcallbackfn)
|
|
|
|
+
|
|
|
|
+ } else {
|
|
|
|
+ client.conn = NewTCPClient("tcp4", conf.MsgServerAddr,
|
|
|
|
+ math.MaxInt32, 2*time.Second, connectedcallbackfn)
|
|
|
|
+ }
|
|
|
|
+ for {
|
|
|
|
+ if err := client.conn.Dial(); err != nil {
|
|
|
|
+ Log.Debug.Println("尝试等待连接到消息总线")
|
|
|
|
+ time.Sleep(5 * time.Second)
|
|
|
|
+ } else {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ go client.readDump(client.messageQueue)
|
|
|
|
+ //发起加入
|
|
|
|
+ client.WriteObj("", "", EVENT_REQUEST_JOIN, SENDTO_TYPE_P2P, map[string]interface{}{
|
|
|
|
+ "handler": conf.CanHandleEvents,
|
|
|
|
+ "name": conf.ClientName,
|
|
|
|
+ "myid": client.myid,
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ return client, nil
|
|
}
|
|
}
|
|
|
|
|
|
//
|
|
//
|
|
@@ -70,43 +140,17 @@ func (client *Client) Call(to, msgid string, event, sendtotype int, obj interfac
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func (client *Client) GetMyclient() string {
|
|
|
|
- return client.myid
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
//底层事件处理
|
|
//底层事件处理
|
|
func (client *Client) baseEventHandle(msg *Packet) {
|
|
func (client *Client) baseEventHandle(msg *Packet) {
|
|
- printlog(".")
|
|
|
|
event := int(msg.Event)
|
|
event := int(msg.Event)
|
|
switch event {
|
|
switch event {
|
|
- case EVENT_RETURN_MACHINE_ID: //服务端分配id,发布我能处理的时间
|
|
|
|
- printlog("*", string(msg.GetBusinessData()))
|
|
|
|
- client.myid = string(msg.GetBusinessData())
|
|
|
|
- client.conn.MyId = client.myid
|
|
|
|
-
|
|
|
|
- if client.oldmyid != "" {
|
|
|
|
- client.WriteObj("", "", EVENT_REMOVE_CLIENT, SENDTO_TYPE_P2P, []byte(client.oldmyid))
|
|
|
|
- client.oldmyid = ""
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- bs := make([]byte, 0)
|
|
|
|
- for _, v := range client.canHandleEvent {
|
|
|
|
- bs = append(bs, Int2Byte(int32(v))...)
|
|
|
|
- }
|
|
|
|
- client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, bs)
|
|
|
|
- client.WriteObj("", "", EVENT_UPDATE_MYNAME, SENDTO_TYPE_P2P, []byte(client.myname))
|
|
|
|
- if client.OnConnectSuccess != nil { //调用成功回调
|
|
|
|
- go client.OnConnectSuccess()
|
|
|
|
- }
|
|
|
|
case EVENT_REQUEST_HEARTBEAT: //请求的心跳,回应心跳请求
|
|
case EVENT_REQUEST_HEARTBEAT: //请求的心跳,回应心跳请求
|
|
- client.WriteObj("", "", EVENT_RETURN_HEARTBEAT, SENDTO_TYPE_P2P, msg.GetBusinessData())
|
|
|
|
|
|
+ client.WriteObj("", "", EVENT_RETURN_HEARTBEAT, SENDTO_TYPE_P2P, msg.Raw)
|
|
client.lastCheckHeart = time.Now().Unix()
|
|
client.lastCheckHeart = time.Now().Unix()
|
|
- case EVENT_SYSTEM_COMMAND: //系统控制指令
|
|
|
|
- go processSysCommand(client, msg.GetBusinessData())
|
|
|
|
- case EVENT_RECIVE_CALLBACK: //回调处理
|
|
|
|
|
|
+ case EVENT_RETURN: //回调处理,一般用于同步操作
|
|
if v, ok := client.resultlock.Load(msg.Msgid); ok {
|
|
if v, ok := client.resultlock.Load(msg.Msgid); ok {
|
|
tmp := v.(chan []byte)
|
|
tmp := v.(chan []byte)
|
|
- tmp <- msg.GetBusinessData()
|
|
|
|
|
|
+ tmp <- msg.Raw
|
|
return
|
|
return
|
|
}
|
|
}
|
|
fallthrough
|
|
fallthrough
|
|
@@ -121,18 +165,18 @@ func (client *Client) read4socket() (*Packet, error) {
|
|
defer Catch()
|
|
defer Catch()
|
|
buffer := make([]byte, 4)
|
|
buffer := make([]byte, 4)
|
|
if _, err := io.ReadFull(client.conn, buffer); err != nil {
|
|
if _, err := io.ReadFull(client.conn, buffer); err != nil {
|
|
- printlog("?_readFullerr", err)
|
|
|
|
|
|
+ Log.Debug.Println("读取包头失败", err)
|
|
return nil, errors.New("read packet error")
|
|
return nil, errors.New("read packet error")
|
|
}
|
|
}
|
|
size := int32(binary.BigEndian.Uint32(buffer))
|
|
size := int32(binary.BigEndian.Uint32(buffer))
|
|
- if size < 32 || size > 16384000 {
|
|
|
|
- printlog("?_sizeerror[", size, "]")
|
|
|
|
|
|
+ if size < PACKET_MIN_LEN || size > PACKET_MAX_LEN {
|
|
|
|
+ Log.Debug.Println("数据包长度越界[", size, "]")
|
|
return nil, errors.New("read packet error")
|
|
return nil, errors.New("read packet error")
|
|
}
|
|
}
|
|
var buf []byte = make([]byte, size)
|
|
var buf []byte = make([]byte, size)
|
|
readlen, err := io.ReadFull(client.conn, buf)
|
|
readlen, err := io.ReadFull(client.conn, buf)
|
|
if err != nil || int32(readlen) != size {
|
|
if err != nil || int32(readlen) != size {
|
|
- printlog("?_readpacketerror", err)
|
|
|
|
|
|
+ Log.Debug.Println("读取包主体数据失败", err)
|
|
return nil, errors.New("read packet error")
|
|
return nil, errors.New("read packet error")
|
|
}
|
|
}
|
|
//
|
|
//
|
|
@@ -184,122 +228,3 @@ func (client *Client) writeDump(p <-chan RawData) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
-//启动客户端
|
|
|
|
-//@Deprecated 此方法已过期
|
|
|
|
-func StartClient(fn func(*Packet),
|
|
|
|
- addr string, myname string,
|
|
|
|
- handleevent []int) (*Client, error) {
|
|
|
|
- //
|
|
|
|
- client := &Client{}
|
|
|
|
- client.resultlock = sync.Map{} //make(map[string]chan []byte)
|
|
|
|
- client.businessEventHandler = fn
|
|
|
|
- client.canHandleEvent = handleevent
|
|
|
|
- client.myname = myname
|
|
|
|
- client.messageQueue = make(chan *Packet, 500) //接受消息
|
|
|
|
- client.writeQueue = make(chan RawData, 500) //写入并发
|
|
|
|
-
|
|
|
|
- go func(q <-chan *Packet, parseEvent func(*Packet)) {
|
|
|
|
- for {
|
|
|
|
- select {
|
|
|
|
- case msg := <-q:
|
|
|
|
- go parseEvent(msg)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }(client.messageQueue, client.baseEventHandle)
|
|
|
|
- go client.writeDump(client.writeQueue)
|
|
|
|
- //创建链接
|
|
|
|
- var err error
|
|
|
|
- for client.conn, err = Dial("tcp4", addr); err != nil; client.conn, err = Dial("tcp4", addr) {
|
|
|
|
- printlog("?")
|
|
|
|
- time.Sleep(5 * time.Second)
|
|
|
|
- }
|
|
|
|
- client.conn.SetMaxRetries(math.MaxInt32) //重试次数
|
|
|
|
- client.conn.SetRetryInterval(2 * time.Second) //重试间隔30秒
|
|
|
|
- client.conn.OnRequestConnect = func() {
|
|
|
|
- client.oldmyid = client.myid
|
|
|
|
- client.myid = ""
|
|
|
|
- }
|
|
|
|
- go client.readDump(client.messageQueue)
|
|
|
|
- return client, nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-//解析处理控制指令
|
|
|
|
-func processSysCommand(client *Client, data []byte) {
|
|
|
|
- tmp := map[string]interface{}{}
|
|
|
|
- err := json.Unmarshal(data, &tmp)
|
|
|
|
- if err != nil {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- cmd := tmp["act"].(string)
|
|
|
|
- switch cmd {
|
|
|
|
- case "pass": //暂停服务
|
|
|
|
- t := int(tmp["passtime"].(float64))
|
|
|
|
- client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, []byte{})
|
|
|
|
- time.Sleep(time.Duration(t) * time.Second)
|
|
|
|
- bs := make([]byte, 0)
|
|
|
|
- for _, v := range client.canHandleEvent {
|
|
|
|
- bs = append(bs, Int2Byte(int32(v))...)
|
|
|
|
- }
|
|
|
|
- client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, bs)
|
|
|
|
- case "reconnect": //重新连接
|
|
|
|
-
|
|
|
|
- case "quit": //退出
|
|
|
|
- client.WriteObj("", "", EVENT_BYE, SENDTO_TYPE_P2P, []byte{})
|
|
|
|
- os.Exit(0)
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-//启动客户端
|
|
|
|
-func NewClient(conf *ClientConfig) (*Client, error) {
|
|
|
|
- if conf.MsgServerAddr == "" {
|
|
|
|
- return nil, errors.New("配置项缺失(MsgServerAddr)")
|
|
|
|
- } else if conf.EventHandler == nil {
|
|
|
|
- return nil, errors.New("配置项缺失(EventHandler)")
|
|
|
|
- }
|
|
|
|
- if conf.ReadBufferSize == 0 {
|
|
|
|
- conf.ReadBufferSize = 200
|
|
|
|
- }
|
|
|
|
- if conf.WriteBufferSize == 0 {
|
|
|
|
- conf.WriteBufferSize = 150
|
|
|
|
- }
|
|
|
|
- client := &Client{}
|
|
|
|
- client.resultlock = sync.Map{} // make(map[string]chan []byte)
|
|
|
|
- client.businessEventHandler = conf.EventHandler
|
|
|
|
- client.canHandleEvent = conf.CanHandleEvents
|
|
|
|
- client.myname = conf.ClientName
|
|
|
|
- client.messageQueue = make(chan *Packet, conf.ReadBufferSize) //接受消息
|
|
|
|
- client.writeQueue = make(chan RawData, conf.WriteBufferSize) //写入并发
|
|
|
|
- go func(q <-chan *Packet, parseEvent func(*Packet)) {
|
|
|
|
- for {
|
|
|
|
- select {
|
|
|
|
- case msg := <-q:
|
|
|
|
- go parseEvent(msg)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }(client.messageQueue, client.baseEventHandle)
|
|
|
|
- go client.writeDump(client.writeQueue)
|
|
|
|
- //创建链接
|
|
|
|
- var err error
|
|
|
|
- for client.conn, err = Dial("tcp4", conf.MsgServerAddr); err != nil; client.conn, err = Dial("tcp4", conf.MsgServerAddr) {
|
|
|
|
- printlog("?")
|
|
|
|
- time.Sleep(5 * time.Second)
|
|
|
|
- }
|
|
|
|
- client.conn.SetMaxRetries(math.MaxInt32) //重试次数
|
|
|
|
- client.conn.SetRetryInterval(2 * time.Second) //重试间隔30秒
|
|
|
|
- //事件绑定
|
|
|
|
- client.conn.OnRequestConnect = func() {
|
|
|
|
- client.oldmyid = client.myid
|
|
|
|
- client.myid = ""
|
|
|
|
- if conf.OnRequestConnect != nil {
|
|
|
|
- conf.OnRequestConnect()
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- client.conn.OnConnectSuccess = func() {
|
|
|
|
- if conf.OnConnectSuccess != nil {
|
|
|
|
- conf.OnConnectSuccess()
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- go client.readDump(client.messageQueue)
|
|
|
|
- return client, nil
|
|
|
|
-}
|
|
|