Bladeren bron

优化实例部署

fengweiqiang 5 jaren geleden
bovenliggende
commit
bb8f260b02
3 gewijzigde bestanden met toevoegingen van 142 en 76 verwijderingen
  1. 125 62
      udp_ocr_conter/main.go
  2. 13 11
      udp_ocr_conter/src/cluster/aliecs.go
  3. 4 3
      udp_ocr_conter/src/config/config.go

+ 125 - 62
udp_ocr_conter/main.go

@@ -169,75 +169,119 @@ func main() {
 	c.AddFunc(spec, func() {
 		d := time.Now()
 		nowday := time.Date(d.Year(), d.Month(), d.Day(), 0, 0, 0, 0, d.Location())
+		cluster.DescribeInstances() //查询多台实例的详细信息
+		ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1}, false, -1, -1)
+		if ocrescs != nil || len(*ocrescs) > 0 {
+			for _, v := range (*ocrescs) {
+				if qu.ObjToString(v["AutoReleaseTime"]) != "" {
+					utc, err := time.ParseInLocation("2006-01-02T15:04Z", qu.ObjToString(v["AutoReleaseTime"]), time.Local)
+					if err != nil {
+						log.Println("解析时间异常", err)
+						continue
+					}
+					if utc.Before(nowday) {
+						log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
+					}
+				}
+			}
+		}
 		taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
 		taskNum := len(*taskArr)
 		log.Println("当前任务数量:", taskNum)
 		//if taskNum <= 0 {
 		//计算释放,发送udp
 		ccnum := compute()
-		log.Println("ccnum:",ccnum,", len(config.CID):", len(config.CID),config.CID)
+		log.Println("ccnum:", ccnum, ", len(config.CID):", len(config.CID), config.CID)
 		if ccnum <= 0 {
 			if len(config.CID) == 0 {
 				log.Println("当前实例为空,无需释放", config.CID, )
 				return
 			}
-			if ccnum == 0 && len(config.CID) >0{
-				for i,_ := range config.CID{
-					tmpIid := config.CID[i]
+			if ccnum == 0 && len(config.CID) > 0 {
+				log.Println("释放所有实例", config.CID)
+				for tmpIid, _ := range config.CID {
+					ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
+					mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
+					log.Println("5分钟后释放实例", tmpIid)
+					udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
+						IP:   net.ParseIP(qu.ObjToString((*ttt)["ip_nw"])),
+						Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
+					})
 					go func(tmpIid string) {
-						config.CID = config.CID[i+1:]
-						mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
-						log.Println("5分钟后释放实例", tmpIid)
-						udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
-							IP:   net.ParseIP(tmpIid),
-							Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
-						})
 						time.Sleep(time.Minute * 5)
 						cluster.DeleteInstance(tmpIid)
 						log.Println("5分钟后释放实例完成", tmpIid)
 					}(tmpIid)
 				}
-			}else {
-				for i:=0; i<ccnum*(-1); ccnum++ {
-					tmpIid := config.CID[i]
+				sys.Lock()
+				config.CID = make(map[string]bool)
+				sys.Unlock()
+			} else {
+				var tmpnum int
+				for k, _ := range config.CID {
+					if ccnum*(-1) >= tmpnum {
+						return
+					}
+					tmpIid := k
+					ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
+					mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
+					log.Println("5分钟后释放实例", tmpIid)
+					udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
+						IP:   net.ParseIP(qu.ObjToString((*ttt)["ip_nw"])),
+						Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
+					})
 					go func(tmpIid string) {
-						config.CID = config.CID[i+1:]
-						mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
-						log.Println("5分钟后释放实例", tmpIid)
-						udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
-							IP:   net.ParseIP(tmpIid),
-							Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
-						})
 						time.Sleep(time.Minute * 5)
 						cluster.DeleteInstance(tmpIid)
 						log.Println("5分钟后释放实例完成", tmpIid)
 					}(tmpIid)
+					sys.Lock()
+					delete(config.CID, k)
+					sys.Unlock()
 				}
+
 			}
 			return
 		}
 		//}
 		if len(config.CID) >= qu.IntAll(config.Sysconfig["pernum"]) {
 			log.Println("实例申请上限,当前实例:", config.CID)
-			return
-		}
-		//if taskNum > 1 {
-		cluster.DescribeInstances() //查询多台实例的详细信息
-		ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1}, false, -1, -1)
-		if ocrescs != nil || len(*ocrescs) > 0 {
-			for _, v := range (*ocrescs) {
-				if qu.ObjToString(v["AutoReleaseTime"]) != "" {
-					utc, err := time.ParseInLocation("2006-01-02T15:04Z", qu.ObjToString(v["AutoReleaseTime"]), time.Local)
-					if err != nil {
-						log.Println("解析时间异常", err)
-						continue
-					}
-					if utc.Before(nowday) {
-						log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
+			escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
+			log.Println("实例未部署数量", len(*escObject))
+			if escObject != nil || len(*escObject) > 0 {
+				for i, v := range (*escObject) {
+					if tmpip := qu.ObjToString(v["ip_nw"]); tmpip == "" {
+						log.Println("没用获取到ip,实例ip异常", tmpip)
+					} else {
+						go func(tmpip, InstanceId string, i int) {
+							if DoCMD(tmpip) {
+								var tmpstr string
+								isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
+								tmpstr += udpstr + ";&nbsp;&nbsp;"
+								isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text")
+								tmpstr += fil2textstr
+								if isok2 && isok3 {
+									(*escObject)[i]["OcrTaskStatus"] = "successful"
+									mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
+									sys.Lock()
+									if config.CID["InstanceId"] == false {
+										config.CID["InstanceId"] = true
+									}
+									sys.Unlock()
+									log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
+								} else {
+									log.Println(tmpip, "部署异常,"+tmpstr)
+								}
+							} else {
+								log.Println(tmpip, "部署失败")
+							}
+						}(tmpip, qu.ObjToString(v["InstanceId"]), i)
 					}
 				}
 			}
+			return
 		}
+		//if taskNum > 1 {
 		log.Println("申请实例")
 		now := time.Now()
 		hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
@@ -316,7 +360,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
 			mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
 			tmp["end_time"] = time.Now().Unix()
-			mongodb.Update("ocr_task_bak", bson.M{"_id": ObjectId.(bson.ObjectId)},map[string]interface{}{"$set": tmp}, true, false)
+			mongodb.Update("ocr_task_bak", bson.M{"_id": ObjectId.(bson.ObjectId)}, map[string]interface{}{"$set": tmp}, true, false)
 			sys.Unlock()
 			return
 		}
@@ -375,24 +419,31 @@ func DynamicTask() {
 			if tmpip := qu.ObjToString(v["ip_nw"]); tmpip == "" {
 				log.Println("没用获取到ip,实例ip异常", tmpip)
 				DynamicTask()
+				return
 			} else {
-				if DoCMD(tmpip) {
-					var tmpstr string
-					isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
-					tmpstr += udpstr + ";&nbsp;&nbsp;"
-					isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text")
-					tmpstr += fil2textstr
-					if isok2 && isok3 {
-						(*escObject)[i]["OcrTaskStatus"] = "successful"
-						mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
-						config.CID = append(config.CID, qu.ObjToString(v["InstanceId"]))
-						log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
+				go func(tmpip,InstanceId string,i int) {
+					if DoCMD(tmpip) {
+						var tmpstr string
+						isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
+						tmpstr += udpstr + ";&nbsp;&nbsp;"
+						isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text")
+						tmpstr += fil2textstr
+						if isok2 && isok3 {
+							(*escObject)[i]["OcrTaskStatus"] = "successful"
+							mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
+							sys.Lock()
+							if config.CID[InstanceId] == false {
+								config.CID[InstanceId] = true
+							}
+							sys.Unlock()
+							log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
+						} else {
+							log.Println(tmpip, "部署异常,"+tmpstr)
+						}
 					} else {
-						log.Println(tmpip, "部署异常,"+tmpstr)
+						log.Println(tmpip, "部署失败")
 					}
-				} else {
-					log.Println(tmpip, "部署失败")
-				}
+				}(tmpip,qu.ObjToString(v["InstanceId"]),i)
 			}
 		}
 	} else {
@@ -413,12 +464,11 @@ func reload(ip string) string {
 	if tmp == nil || len(*tmp) <= 0 {
 		return "ip不存在"
 	}
-	for i, v := range config.CID {
-		if v == qu.ObjToString((*tmp)["InstanceId"]) {
-			config.CID = append(config.CID[:i], config.CID[i+1:]...)
-			break
-		}
+	sys.Lock()
+	if config.CID[qu.ObjToString((*tmp)["InstanceId"])] == true{
+		delete(config.CID,qu.ObjToString((*tmp)["InstanceId"]))
 	}
+	sys.Unlock()
 	cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例
 	now := time.Now()
 	hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
@@ -444,7 +494,11 @@ func reload(ip string) string {
 			if isok2 && isok3 {
 				(*escObject)[0]["OcrTaskStatus"] = "successful"
 				mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[0]["_id"]}, (*escObject)[0], true, false)
-				config.CID = append(config.CID, qu.ObjToString((*escObject)[0]["InstanceId"]))
+				sys.Lock()
+				if config.CID[qu.ObjToString((*escObject)[0]["InstanceId"])] == false {
+					config.CID[qu.ObjToString((*escObject)[0]["InstanceId"])] = true
+				}
+				sys.Unlock()
 				log.Println((*escObject)[0]["_id"], tmpip, "部署成功")
 				return tmpip + "部署成功," + tmpstr
 			} else {
@@ -461,6 +515,7 @@ func compute() int {
 	nowtime := time.Now().Unix()
 	taskArrase := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
 	if taskArrase == nil || len(*taskArrase) == 0 {
+		log.Println(464, "nil ro len(*taskArrase) == 0")
 		return 0
 	}
 	stmp := (*taskArrase)[0]
@@ -468,6 +523,7 @@ func compute() int {
 	if stmp != nil && etmp != nil {
 		stime := qu.Int64All(stmp["import_time"])
 		if nowtime-stime <= qu.Int64All(config.Sysconfig["time_consuming_limit"]) {
+			log.Println(472, nowtime-stime, "<=", qu.Int64All(config.Sysconfig["time_consuming_limit"]))
 			return 0
 		}
 
@@ -478,6 +534,7 @@ func compute() int {
 			"$lt":  eid,
 		}})
 		if sum <= qu.IntAll(config.Sysconfig["accumulated_task_limit"]) {
+			log.Println(483, sum, sid, eid)
 			return 0
 		}
 		gteid := bson.ObjectIdHex(qu.ObjToString(stmp[qu.ObjToString(config.SidField)]))
@@ -486,18 +543,24 @@ func compute() int {
 			"$lt":  gteid,
 		}})
 		if overNum == 0 {
+			log.Println(492, overNum, sid, gteid)
 			return 0
 		}
 		if sum <= qu.IntAll(config.Sysconfig["accumulated_task_lowlimit"]) {
+			log.Println(496, sum, "<=", qu.IntAll(config.Sysconfig["accumulated_task_lowlimit"]))
+			return 0
+		}
+		mtmm := float64(overNum) / float64(nowtime-stime) / float64(len(config.CID)+3) //每台每秒
+		if mtmm <= 0 {
+			log.Println(501, overNum, int(nowtime-stime), (len(config.CID) + 3))
 			return 0
 		}
-		mtmm := overNum / int(nowtime-stime) / (len(config.CID) + 3) //每台每秒
-		cc := sum/qu.IntAll(config.Sysconfig["corntime_consuming"])/mtmm - len(config.CID) - 3
+		cc := float64(sum)/float64(qu.IntAll(config.Sysconfig["corntime_consuming"]))/mtmm - float64(len(config.CID)) - 3
 		log.Println("overNum:", overNum, ",hs:", int(nowtime-stime), ",mtms:", mtmm, ",sum:", sum, cc)
-		if cc > qu.IntAll(config.Sysconfig["pernum"]) {
+		if cc > qu.Float64All(config.Sysconfig["pernum"]) {
 			return qu.IntAll(config.Sysconfig["pernum"])
 		} else {
-			return cc
+			return int(cc)
 		}
 	}
 	return 0

+ 13 - 11
udp_ocr_conter/src/cluster/aliecs.go

@@ -21,6 +21,7 @@ import (
 	"qfw/util/mongodb"
 	"sort"
 	"strings"
+	"sync"
 	"time"
 )
 
@@ -99,6 +100,7 @@ func runInstances(kv map[string]interface{}, taskName, widthOut, computer string
 	}
 	//log.Println(res)
 }
+var sys sync.RWMutex
 
 //查询多台实例的详细信息
 func DescribeInstances() {
@@ -110,27 +112,27 @@ func DescribeInstances() {
 	for _, ins := range res["Instances"].(map[string]interface{}) {
 		for _, val := range ins.([]interface{}) {
 			if tmp, ok := val.(map[string]interface{}); ok {
-				if qu.ObjToString(tmp["InstanceName"]) == UseFor {
-					if t, ok := tmp["VpcAttributes"].(map[string]interface{}); ok {
-						if tt, ok := t["PrivateIpAddress"].(map[string]interface{}); ok {
-							ttt := tt["IpAddress"].([]interface{})
-							tmp["ip_nw"] = ttt[0]
-						}
+				if t, ok := tmp["VpcAttributes"].(map[string]interface{}); ok {
+					if tt, ok := t["PrivateIpAddress"].(map[string]interface{}); ok {
+						ttt := tt["IpAddress"].([]interface{})
+						tmp["ip_nw"] = ttt[0]
 					}
-					if t, ok := tmp["PublicIpAddress"].(map[string]interface{}); ok {
-						if tt, ok := t["IpAddress"].([]interface{}); ok && len(tt) > 0 {
-							tmp["ip_ww"] = tt[0]
-						}
+				}
+				if t, ok := tmp["PublicIpAddress"].(map[string]interface{}); ok {
+					if tt, ok := t["IpAddress"].([]interface{}); ok && len(tt) > 0 {
+						tmp["ip_ww"] = tt[0]
 					}
+				}
+				if strings.Contains(qu.ObjToString(tmp["InstanceName"]), "ocr_task") {
 					log.Println("更新申请实例",tmp["InstanceId"],"内网ip",tmp["ip_nw"])
 					//更新实例信息
 					mongodb.Update("ocr_ecs", `{"InstanceId":"`+qu.ObjToString(tmp["InstanceId"])+`"}`, map[string]interface{}{"$set": tmp}, true, false)
 					mongodb.Update("ocr_ecs_bak", `{"InstanceId":"`+qu.ObjToString(tmp["InstanceId"])+`"}`, map[string]interface{}{"$set": tmp}, true, false)
+
 				}
 			}
 		}
 	}
-	//log.Println(res)
 }
 
 //停止实例

+ 4 - 3
udp_ocr_conter/src/config/config.go

@@ -11,7 +11,8 @@ import (
 
 var Sysconfig map[string]interface{}
 var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
-var CID []string
+var CID = map[string]bool{}
+
 func init() {
 	util.ReadConfig(&Sysconfig)
 	MgoIP = util.ObjToString(Sysconfig["mongodb_ip"])
@@ -26,7 +27,7 @@ func init() {
 	}
 	mongodb.InitMongodbPool(util.IntAllDef(Sysconfig["dbsize"], 5), MgoIP, MgoDB)
 
-	ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1,"InstanceId":1}, false, -1, -1)
+	ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1, "InstanceId": 1}, false, -1, -1)
 	if ocrescs != nil || len(*ocrescs) > 0 {
 		for _, v := range (*ocrescs) {
 			if util.ObjToString(v["AutoReleaseTime"]) != "" {
@@ -36,7 +37,7 @@ func init() {
 					continue
 				}
 				if utc.After(time.Now()) {
-					CID = append(CID,util.ObjToString(v["InstanceId"]))
+					CID[ util.ObjToString(v["InstanceId"])] = true
 					//log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
 				}
 			}