Browse Source

算法修改

fengweiqiang 5 years ago
parent
commit
0e941054b7
3 changed files with 73 additions and 39 deletions
  1. 2 0
      udp_ocr_conter/config.json
  2. 68 36
      udp_ocr_conter/main.go
  3. 3 3
      udp_ocr_conter/src/config/config.go

+ 2 - 0
udp_ocr_conter/config.json

@@ -17,6 +17,8 @@
   "cornstr": "0 0/5 * * * ?",
   "corntime_consuming": 300,
   "pernum": 5,
+  "time_consuming_limit": 60,
+  "accumulated_task_lowlimit": 1000,
   "esconfig": {
     "available": true,
     "AccessID": "LTAIkuomMLAjIlGH",

+ 68 - 36
udp_ocr_conter/main.go

@@ -174,29 +174,51 @@ func main() {
 		log.Println("当前任务数量:", taskNum)
 		//if taskNum <= 0 {
 		//计算释放,发送udp
-		if !compute() {
-			if len(config.Cnum) <= 0 {
-				log.Println("当前实例为空,无需释放",config.Cnum,)
+		ccnum := compute()
+		log.Println("ccnum:",ccnum,", len(config.CID):", len(config.CID),config.CID)
+		if ccnum <= 0 {
+			if len(config.CID) == 0 {
+				log.Println("当前实例为空,无需释放", config.CID, )
 				return
 			}
-			tmpIid := config.Cnum[0]
-			go func(tmpIid string) {
-				DeleteNode.Reset(time.Minute * 50)
-				<-DeleteNode.C
-				config.Cnum = config.Cnum[1:]
-				mongodb.Del("ocr_ecs", bson.M{"ip_nw": tmpIid})
-				udpclient.WriteUdp([]byte("一小时后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
-					IP:   net.ParseIP(tmpIid),
-					Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
-				})
-			}(tmpIid)
-			cluster.ModifyInstanceAutoReleaseTime(tmpIid, 1)
-			log.Println("一小时后释放实例", tmpIid)
+			if ccnum == 0 && len(config.CID) >0{
+				for i,_ := range config.CID{
+					tmpIid := config.CID[i]
+					go func(tmpIid string) {
+						config.CID = config.CID[i+1:]
+						mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
+						log.Println("5分钟后释放实例", tmpIid)
+						udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
+							IP:   net.ParseIP(tmpIid),
+							Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
+						})
+						time.Sleep(time.Minute * 5)
+						cluster.DeleteInstance(tmpIid)
+						log.Println("5分钟后释放实例完成", tmpIid)
+					}(tmpIid)
+				}
+			}else {
+				for i:=0; i<ccnum*(-1); ccnum++ {
+					tmpIid := config.CID[i]
+					go func(tmpIid string) {
+						config.CID = config.CID[i+1:]
+						mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
+						log.Println("5分钟后释放实例", tmpIid)
+						udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
+							IP:   net.ParseIP(tmpIid),
+							Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
+						})
+						time.Sleep(time.Minute * 5)
+						cluster.DeleteInstance(tmpIid)
+						log.Println("5分钟后释放实例完成", tmpIid)
+					}(tmpIid)
+				}
+			}
 			return
 		}
 		//}
-		if len(config.Cnum) >= qu.IntAll(config.Sysconfig["pernum"]) {
-			log.Println("实例申请上限,当前实例:", config.Cnum)
+		if len(config.CID) >= qu.IntAll(config.Sysconfig["pernum"]) {
+			log.Println("实例申请上限,当前实例:", config.CID)
 			return
 		}
 		//if taskNum > 1 {
@@ -219,7 +241,7 @@ func main() {
 		log.Println("申请实例")
 		now := time.Now()
 		hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
-		cluster.RunInstances("ocr_task_arr", "8", "false", 1, int(math.Round(hours)))
+		cluster.RunInstances("ocr_task_arr", "8", "false", ccnum, int(math.Round(hours)))
 		log.Println("实例申请成功")
 		DynamicTask() //动态任务
 		log.Println("申请实例结束")
@@ -293,6 +315,8 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			})
 			log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
 			mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
+			tmp["end_time"] = time.Now().Unix()
+			mongodb.Update("ocr_task_bak", bson.M{"_id": ObjectId.(bson.ObjectId)},map[string]interface{}{"$set": tmp}, true, false)
 			sys.Unlock()
 			return
 		}
@@ -361,7 +385,7 @@ func DynamicTask() {
 					if isok2 && isok3 {
 						(*escObject)[i]["OcrTaskStatus"] = "successful"
 						mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
-						config.Cnum = append(config.Cnum, tmpip)
+						config.CID = append(config.CID, qu.ObjToString(v["InstanceId"]))
 						log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
 					} else {
 						log.Println(tmpip, "部署异常,"+tmpstr)
@@ -389,9 +413,9 @@ func reload(ip string) string {
 	if tmp == nil || len(*tmp) <= 0 {
 		return "ip不存在"
 	}
-	for i, v := range config.Cnum {
-		if v == ip {
-			config.Cnum = append(config.Cnum[:i], config.Cnum[i+1:]...)
+	for i, v := range config.CID {
+		if v == qu.ObjToString((*tmp)["InstanceId"]) {
+			config.CID = append(config.CID[:i], config.CID[i+1:]...)
 			break
 		}
 	}
@@ -420,7 +444,7 @@ func reload(ip string) string {
 			if isok2 && isok3 {
 				(*escObject)[0]["OcrTaskStatus"] = "successful"
 				mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[0]["_id"]}, (*escObject)[0], true, false)
-				config.Cnum = append(config.Cnum, tmpip)
+				config.CID = append(config.CID, qu.ObjToString((*escObject)[0]["InstanceId"]))
 				log.Println((*escObject)[0]["_id"], tmpip, "部署成功")
 				return tmpip + "部署成功," + tmpstr
 			} else {
@@ -433,18 +457,18 @@ func reload(ip string) string {
 	return "重新部署" + ip + "未知错误,查询多台实例的详细信息,没有查询到新创建实例"
 }
 
-func compute() bool {
+func compute() int {
 	nowtime := time.Now().Unix()
 	taskArrase := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
-	if taskArrase == nil || len(*taskArrase) == 0{
-		return false
+	if taskArrase == nil || len(*taskArrase) == 0 {
+		return 0
 	}
 	stmp := (*taskArrase)[0]
 	etmp := (*taskArrase)[len(*taskArrase)-1]
 	if stmp != nil && etmp != nil {
 		stime := qu.Int64All(stmp["import_time"])
-		if nowtime-stime <= 0 {
-			return false
+		if nowtime-stime <= qu.Int64All(config.Sysconfig["time_consuming_limit"]) {
+			return 0
 		}
 
 		sid := bson.ObjectIdHex(qu.ObjToString(stmp["start"]))
@@ -453,20 +477,28 @@ func compute() bool {
 			"$gte": sid,
 			"$lt":  eid,
 		}})
-
+		if sum <= qu.IntAll(config.Sysconfig["accumulated_task_limit"]) {
+			return 0
+		}
 		gteid := bson.ObjectIdHex(qu.ObjToString(stmp[qu.ObjToString(config.SidField)]))
 		overNum := mongodb.Count("bidding", bson.M{"_id": bson.M{
 			"$gte": sid,
 			"$lt":  gteid,
 		}})
-		log.Println("overNum:", overNum, ",hs:", int(nowtime-stime), ",sum:", sum)
 		if overNum == 0 {
-			return false
+			return 0
 		}
-		if overNum/int(nowtime-stime)*qu.IntAll(config.Sysconfig["corntime_consuming"]) >= sum {
-			return false
+		if sum <= qu.IntAll(config.Sysconfig["accumulated_task_lowlimit"]) {
+			return 0
+		}
+		mtmm := overNum / int(nowtime-stime) / (len(config.CID) + 3) //每台每秒
+		cc := sum/qu.IntAll(config.Sysconfig["corntime_consuming"])/mtmm - len(config.CID) - 3
+		log.Println("overNum:", overNum, ",hs:", int(nowtime-stime), ",mtms:", mtmm, ",sum:", sum, cc)
+		if cc > qu.IntAll(config.Sysconfig["pernum"]) {
+			return qu.IntAll(config.Sysconfig["pernum"])
+		} else {
+			return cc
 		}
-		return true
 	}
-	return false
+	return 0
 }

+ 3 - 3
udp_ocr_conter/src/config/config.go

@@ -11,7 +11,7 @@ import (
 
 var Sysconfig map[string]interface{}
 var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
-var Cnum []string
+var CID []string
 func init() {
 	util.ReadConfig(&Sysconfig)
 	MgoIP = util.ObjToString(Sysconfig["mongodb_ip"])
@@ -26,7 +26,7 @@ func init() {
 	}
 	mongodb.InitMongodbPool(util.IntAllDef(Sysconfig["dbsize"], 5), MgoIP, MgoDB)
 
-	ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1,"ip_nw":1}, false, -1, -1)
+	ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1,"InstanceId":1}, false, -1, -1)
 	if ocrescs != nil || len(*ocrescs) > 0 {
 		for _, v := range (*ocrescs) {
 			if util.ObjToString(v["AutoReleaseTime"]) != "" {
@@ -36,7 +36,7 @@ func init() {
 					continue
 				}
 				if utc.After(time.Now()) {
-					Cnum = append(Cnum, util.ObjToString(v["ip_nw"]))
+					CID = append(CID,util.ObjToString(v["InstanceId"]))
 					//log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
 				}
 			}