ソースを参照

mfw消息服务新增接收下载消息;优化消息分配逻辑

maxiaoshan 1 年間 前
コミット
1d14ac1994
2 ファイル変更38 行追加4 行削除
  1. 2 1
      src/mfw/util/event.go
  2. 36 3
      src/服务端/main.go

+ 2 - 1
src/mfw/util/event.go

@@ -38,5 +38,6 @@ const (
 	SENDTO_TYPE_ALL_RECIVER  = 2 //发送给所有指定服务接收者
 	SENDTO_TYPE_P2P          = 3 //发送给指定客户端
 
-	SENDTO_TYPE_P2P_BYNAME = 4 //发送给指定客户,通过指定客户的名称
+	SENDTO_TYPE_P2P_BYNAME  = 4 //发送给指定客户,通过指定客户的名称
+	SENDTO_TYPE_IDLE_SERVER = 5 //发送给消息中心,记录空闲的服务端(数据下载)
 )

+ 36 - 3
src/服务端/main.go

@@ -31,6 +31,12 @@ var allclient map[string]*Client = make(map[string]*Client)
 //服务与提供者对应表
 var allservice map[int][]string = make(map[int][]string)
 
+//记录空闲下载器
+var (
+	allIdleServiceLock = &sync.Mutex{}
+	allIdleServiceMap  = make(map[string]int)
+)
+
 //心跳检测规则重新定义::
 //每次读写都会刷新心跳时间,
 //长时间未发生读写时发送心跳查询,
@@ -168,7 +174,7 @@ func processmsg(msg *util.Packet) {
 	//直接更新发送人心跳
 	updateheartbeat(from)
 	switch event {
-	//TODO 只写需要特殊处理的时间,其他都走default
+	//TODO 只写需要特殊处理的事件,其他都走default
 	case util.EVENT_RETURN_HEARTBEAT: //心跳回应包处理
 		fmt.Print(".")
 	case util.EVENT_PUBLISH_MYSERVICES: //客户端发布了自己的服务
@@ -201,6 +207,13 @@ func processmsg(msg *util.Packet) {
 				v1.conn.Write(util.Enpacket("", msg.From, msg.Msgid, util.EVENT_RECIVE_CALLBACK, util.SENDTO_TYPE_P2P, v))
 			}
 		}
+	case util.SENDTO_TYPE_IDLE_SERVER:
+		param := map[string]int{}
+		if json.Unmarshal(msg.GetBusinessData(), &param) == nil {
+			allIdleServiceLock.Lock()
+			allIdleServiceMap[msg.From] = param[msg.From]
+			allIdleServiceLock.Unlock()
+		}
 	default: //处理业务事件
 		//识别发送类型
 		sttype := int(msg.SentToType)
@@ -236,7 +249,24 @@ func processmsg(msg *util.Packet) {
 					if len(v) < 1 {
 						break
 					}
-					service_machine_id := v[rand.Intn(len(v))]
+					var service_machine_id string
+					//先用空闲下载器
+					allIdleServiceLock.Lock()
+					for service_machine_id_tmp, num := range allIdleServiceMap {
+						service_machine_id = service_machine_id_tmp
+						if tmpNum := num - 1; tmpNum <= 0 {
+							delete(allIdleServiceMap, service_machine_id_tmp)
+						} else {
+							allIdleServiceMap[service_machine_id_tmp] = tmpNum
+						}
+						break
+					}
+					allIdleServiceLock.Unlock()
+					//随机取某个下载器
+					if service_machine_id == "" {
+						service_machine_id = v[rand.Intn(len(v))]
+						fmt.Println("default vps---", service_machine_id)
+					}
 					lock.Lock()
 					client, ok := allclient[service_machine_id]
 					lock.Unlock()
@@ -246,6 +276,9 @@ func processmsg(msg *util.Packet) {
 							updateheartbeat(service_machine_id)
 							break
 						} else {
+							allIdleServiceLock.Lock()
+							delete(allIdleServiceMap, service_machine_id) //删除不能通信的下载器
+							allIdleServiceLock.Unlock()
 							removeClient(service_machine_id)
 						}
 					}
@@ -277,7 +310,7 @@ var gcinterval int64
 
 //
 func main() {
-	flag.StringVar(&port, "p", "7070", "开放端口")
+	flag.StringVar(&port, "p", "801", "开放端口")
 	flag.Int64Var(&gcinterval, "g", 10, "GC间隔时间")
 	flag.Parse()
 	//心跳检测