|
@@ -1,155 +0,0 @@
|
|
|
-package main
|
|
|
-
|
|
|
-import (
|
|
|
- "encoding/json"
|
|
|
- "flag"
|
|
|
- "log"
|
|
|
- "math/rand"
|
|
|
- "net"
|
|
|
- "sync"
|
|
|
- "time"
|
|
|
- "util"
|
|
|
-)
|
|
|
-
|
|
|
-type Client struct {
|
|
|
- conn net.Conn
|
|
|
- timestamp int64
|
|
|
-}
|
|
|
-
|
|
|
-var lock sync.Mutex
|
|
|
-
|
|
|
-//所有的请求
|
|
|
-var allclient map[string]*Client = make(map[string]*Client)
|
|
|
-
|
|
|
-//服务与提供者对应表
|
|
|
-var allservice map[int][]string = make(map[int][]string)
|
|
|
-
|
|
|
-//
|
|
|
-
|
|
|
-//心跳检测,每隔20秒检测一次
|
|
|
-func GC() {
|
|
|
- now := time.Now().Unix()
|
|
|
- for k, v := range allclient {
|
|
|
- if now-v.timestamp > gcinterval*3 {
|
|
|
- //3次GC未回应心跳
|
|
|
- v.conn.Close()
|
|
|
- removeClient(k)
|
|
|
- continue
|
|
|
- }
|
|
|
- _, err := v.conn.Write(util.EnpacketObj(map[string]interface{}{
|
|
|
- "event": util.EVENT_REQUEST_HEARTBEAT,
|
|
|
- }))
|
|
|
- if err != nil { //发心跳包出错
|
|
|
- v.conn.Close()
|
|
|
- removeClient(k)
|
|
|
- }
|
|
|
- }
|
|
|
- time.AfterFunc(time.Duration(gcinterval)*time.Second, GC)
|
|
|
-}
|
|
|
-
|
|
|
-//删除服务节点
|
|
|
-func removeClient(myid string) {
|
|
|
- lock.Lock()
|
|
|
- delete(allclient, myid)
|
|
|
- for k, v := range allservice {
|
|
|
- for j, smid := range v {
|
|
|
- if smid == myid {
|
|
|
- allservice[k] = append(v[:j], v[j+1:]...)
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- lock.Unlock()
|
|
|
- log.Println("删除节点", myid, allservice)
|
|
|
-}
|
|
|
-
|
|
|
-//处理客户端发过来的消息
|
|
|
-func processmsg(msg []byte) {
|
|
|
- tmp := map[string]interface{}{}
|
|
|
- json.Unmarshal(msg, &tmp)
|
|
|
- my_id := tmp["myid"].(string)
|
|
|
- if v, ok := tmp["event"]; ok {
|
|
|
- event := int(v.(float64))
|
|
|
- switch event {
|
|
|
- //TODO 只写需要特殊处理的时间,其他都走default
|
|
|
- case util.EVENT_RETURN_HEARTBEAT: //心跳回应包处理
|
|
|
- allclient[my_id].timestamp = int64(tmp["data"].(float64))
|
|
|
- log.Println("更新", my_id, "的心跳时间")
|
|
|
- case util.EVENT_PUBLISH_MYSERVICES: //客户端发布了自己的服务
|
|
|
- services := tmp["data"].([]interface{}) //一个客户端提供多个可处理的服务
|
|
|
- for _, v := range services {
|
|
|
- service := int(v.(float64))
|
|
|
- allservice[service] = append(allservice[service], my_id)
|
|
|
- }
|
|
|
- log.Println("所有服务", allservice)
|
|
|
- default: //处理业务事件
|
|
|
- //识别发送类型
|
|
|
- var sttype int
|
|
|
- if sendtotype, ok := tmp["sendtotype"]; ok {
|
|
|
- sttype = int(sendtotype.(float64))
|
|
|
- } else {
|
|
|
- sttype = util.SENDTO_TYPE_RAND_RECIVER
|
|
|
- }
|
|
|
- bs := util.Enpacket(msg) //待发送数据
|
|
|
- if sttype == util.SENDTO_TYPE_ALL { //发送给所有节点
|
|
|
- for service_machine_id, v := range allclient { //发所有,不支持的不处理
|
|
|
- if service_machine_id == my_id { //广播不用发给自己
|
|
|
- continue
|
|
|
- }
|
|
|
- v.conn.Write(bs)
|
|
|
- }
|
|
|
- }
|
|
|
- if v, ok := allservice[event]; ok {
|
|
|
- switch sttype {
|
|
|
- case util.SENDTO_TYPE_RAND_RECIVER: //随机选择一个节点提供服务,允许出错尝试3次
|
|
|
- for i := 0; i < 3; i++ {
|
|
|
- service_machine_id := v[rand.Intn(len(v))]
|
|
|
- _, err := allclient[service_machine_id].conn.Write(bs)
|
|
|
- if err == nil {
|
|
|
- break
|
|
|
- } else {
|
|
|
- removeClient(service_machine_id)
|
|
|
- }
|
|
|
- }
|
|
|
- case util.SENDTO_TYPE_ALL_RECIVER:
|
|
|
- if v, ok := allservice[event]; ok {
|
|
|
- for _, service_machine_id := range v {
|
|
|
- if service_machine_id == my_id { //广播不用发给自己
|
|
|
- continue
|
|
|
- }
|
|
|
- allclient[service_machine_id].conn.Write(bs)
|
|
|
- }
|
|
|
- }
|
|
|
- case util.SENDTO_TYPE_P2P:
|
|
|
- to := tmp["to"].(string)
|
|
|
- if v2, ok2 := allclient[to]; ok2 {
|
|
|
- v2.conn.Write(bs)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-var port string
|
|
|
-var gcinterval int64
|
|
|
-
|
|
|
-//
|
|
|
-func main() {
|
|
|
- flag.StringVar(&port, "p", "6060", "开放端口")
|
|
|
- flag.Int64Var(&gcinterval, "g", 20, "GC间隔时间")
|
|
|
- flag.Parse()
|
|
|
- //心跳检测
|
|
|
- go GC()
|
|
|
- //启动服务
|
|
|
- util.StartServer(func(data []byte) {
|
|
|
- //接受消息处理
|
|
|
- processmsg(data)
|
|
|
- }, func(c net.Conn) { //连接后返回UUID
|
|
|
- uuid := util.UUID(32)
|
|
|
- c.Write(util.EnpacketObj(map[string]interface{}{"event": util.EVENT_RETURN_MACHINE_ID, "data": uuid}))
|
|
|
- allclient[uuid] = &Client{conn: c, timestamp: time.Now().Unix()}
|
|
|
- }, ":"+port)
|
|
|
-
|
|
|
-}
|