|
@@ -2,6 +2,7 @@ package main
|
|
|
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
+ "flag"
|
|
|
"log"
|
|
|
"math/rand"
|
|
|
"net"
|
|
@@ -29,8 +30,8 @@ var allservice map[int][]string = make(map[int][]string)
|
|
|
func GC() {
|
|
|
now := time.Now().Unix()
|
|
|
for k, v := range allclient {
|
|
|
- if now-v.timestamp > 12 {
|
|
|
- //超过40秒未回应心跳
|
|
|
+ if now-v.timestamp > gcinterval*3 {
|
|
|
+ //3次GC未回应心跳
|
|
|
v.conn.Close()
|
|
|
removeClient(k)
|
|
|
continue
|
|
@@ -43,7 +44,7 @@ func GC() {
|
|
|
removeClient(k)
|
|
|
}
|
|
|
}
|
|
|
- time.AfterFunc(4*time.Second, GC)
|
|
|
+ time.AfterFunc(time.Duration(gcinterval)*time.Second, GC)
|
|
|
}
|
|
|
|
|
|
//删除服务节点
|
|
@@ -81,44 +82,64 @@ func processmsg(msg []byte) {
|
|
|
allservice[service] = append(allservice[service], my_id)
|
|
|
}
|
|
|
log.Println("所有服务", allservice)
|
|
|
- case util.EVENT_BROADCAST_REQUEST_SPIDER_STATE: //要个爬虫状态
|
|
|
- data := util.Enpacket(msg)
|
|
|
- for service_machine_id, v := range allclient { //发所有,不支持的不处理
|
|
|
- if service_machine_id == my_id { //广播不用发给自己
|
|
|
- continue
|
|
|
- }
|
|
|
- v.conn.Write(data)
|
|
|
+ default: //处理业务事件
|
|
|
+ //识别发送类型
|
|
|
+ var sttype int
|
|
|
+ if sendtotype, ok := tmp["sendtotype"]; ok {
|
|
|
+ sttype = int(sendtotype.(float64))
|
|
|
+ } else {
|
|
|
+ sttype = util.SENDTO_TYPE_RAND_RECIVER
|
|
|
}
|
|
|
- case util.SERVICE_RECIVE_SPIDER_STATE: //爬虫的回应,发给所有监控端
|
|
|
- bs := util.Enpacket(msg)
|
|
|
- if v, ok := allservice[event]; ok {
|
|
|
- for _, service_machine_id := range v {
|
|
|
+ bs := util.Enpacket(msg) //待发送数据
|
|
|
+ if sttype == util.SENDTO_TYPE_ALL { //发送给所有节点
|
|
|
+ for service_machine_id, v := range allclient { //发所有,不支持的不处理
|
|
|
if service_machine_id == my_id { //广播不用发给自己
|
|
|
continue
|
|
|
}
|
|
|
- allclient[service_machine_id].conn.Write(bs)
|
|
|
+ v.conn.Write(bs)
|
|
|
}
|
|
|
}
|
|
|
- default: //处理业务事件
|
|
|
if v, ok := allservice[event]; ok {
|
|
|
- //随机选择一个节点提供服务,允许出错尝试3次
|
|
|
- bs := util.Enpacket(msg)
|
|
|
- for i := 0; i < 3; i++ {
|
|
|
- service_machine_id := v[rand.Intn(len(v))]
|
|
|
- _, err := allclient[service_machine_id].conn.Write(bs)
|
|
|
- if err == nil {
|
|
|
- break
|
|
|
- } else {
|
|
|
- removeClient(service_machine_id)
|
|
|
+ switch sttype {
|
|
|
+ case util.SENDTO_TYPE_RAND_RECIVER: //随机选择一个节点提供服务,允许出错尝试3次
|
|
|
+ for i := 0; i < 3; i++ {
|
|
|
+ service_machine_id := v[rand.Intn(len(v))]
|
|
|
+ _, err := allclient[service_machine_id].conn.Write(bs)
|
|
|
+ if err == nil {
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ removeClient(service_machine_id)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ case util.SENDTO_TYPE_ALL_RECIVER:
|
|
|
+ if v, ok := allservice[event]; ok {
|
|
|
+ for _, service_machine_id := range v {
|
|
|
+ if service_machine_id == my_id { //广播不用发给自己
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ allclient[service_machine_id].conn.Write(bs)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ case util.SENDTO_TYPE_P2P:
|
|
|
+ to := tmp["to"].(string)
|
|
|
+ if v2, ok2 := allclient[to]; ok2 {
|
|
|
+ v2.conn.Write(bs)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+var port string
|
|
|
+var gcinterval int64
|
|
|
+
|
|
|
//
|
|
|
func main() {
|
|
|
+ flag.StringVar(&port, "p", "6060", "开放端口")
|
|
|
+ flag.Int64Var(&gcinterval, "g", 20, "GC间隔时间")
|
|
|
+ flag.Parse()
|
|
|
//心跳检测
|
|
|
go GC()
|
|
|
//启动服务
|
|
@@ -129,6 +150,6 @@ func main() {
|
|
|
uuid := util.UUID(32)
|
|
|
c.Write(util.EnpacketObj(map[string]interface{}{"event": util.EVENT_RETURN_MACHINE_ID, "data": uuid}))
|
|
|
allclient[uuid] = &Client{conn: c, timestamp: time.Now().Unix()}
|
|
|
- }, ":6060")
|
|
|
+ }, ":"+port)
|
|
|
|
|
|
}
|