fengweiqiang 5 vuotta sitten
vanhempi
commit
3024fecb0d

+ 9 - 15
udp_ocr_conter/main.go

@@ -22,7 +22,10 @@ import (
 	"time"
 )
 
-var DeleteNode = time.NewTimer(time.Minute * 50)
+func init() {
+	cluster.DescribeInstances()
+	log.Println("初始化CID:",cluster.CID)
+}
 
 func main() {
 	config.Udpclient.Listen(processUdpMsg)
@@ -37,7 +40,7 @@ func main() {
 			}))
 		}
 	}
-	for _,v:= range config.CID{
+	for _,v:= range cluster.CID{
 		log.Println("通知udp来取数据2:",v,config.Sysconfig["broadcast_port"],config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
 			IP:   net.ParseIP(v),
 			Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
@@ -256,23 +259,17 @@ func reload(ip string) string {
 	if tmp == nil || len(*tmp) <= 0 {
 		return "ip不存在"
 	}
-	config.Sys.Lock()
-	if config.CID[qu.ObjToString((*tmp)["InstanceId"])] != "" {
-		delete(config.CID, qu.ObjToString((*tmp)["InstanceId"]))
-	}
-	config.Sys.Unlock()
-	cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例
 	now := time.Now()
 	hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
 	cluster.RunInstances("ocr_task_arr", "8", "false", 1, int(math.Round(hours))) //创建新实例
-	time.Sleep(time.Second * 30)
+	time.Sleep(time.Second * 20)
 	cluster.DescribeInstances() //查询多台实例的详细信息
 	escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1)
 	if escObject != nil || len(*escObject) > 0 {
 		var tmpip string
 		if tmpip = qu.ObjToString((*escObject)[0]["ip_nw"]); tmpip == "" {
 			log.Println("没用获取到ip,实例ip异常", tmpip)
-			time.Sleep(time.Minute)
+			time.Sleep(time.Second * 20)
 			cluster.DescribeInstances() //查询多台实例的详细信息
 			escObject = mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1)
 			tmpip = qu.ObjToString((*escObject)[0]["ip_nw"])
@@ -286,11 +283,6 @@ func reload(ip string) string {
 			if isok2 && isok3 {
 				(*escObject)[0]["OcrTaskStatus"] = "successful"
 				mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[0]["_id"]}, (*escObject)[0], true, false)
-				config.Sys.Lock()
-				if config.CID[qu.ObjToString((*escObject)[0]["InstanceId"])] == "" && qu.ObjToString((*escObject)[0]["InstanceId"]) !=""{
-					config.CID[qu.ObjToString((*escObject)[0]["InstanceId"])] = tmpip
-				}
-				config.Sys.Unlock()
 				log.Println((*escObject)[0]["_id"], tmpip, "部署成功")
 				return tmpip + "部署成功," + tmpstr
 			} else {
@@ -300,5 +292,7 @@ func reload(ip string) string {
 			return tmpip + "部署失败"
 		}
 	}
+	cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例
+	cluster.DescribeInstances() //查询多台实例的详细信息
 	return "重新部署" + ip + "未知错误,查询多台实例的详细信息,没有查询到新创建实例"
 }

+ 6 - 4
udp_ocr_conter/src/cluster/aliecs.go

@@ -21,7 +21,6 @@ import (
 	"qfw/util/mongodb"
 	"sort"
 	"strings"
-	"sync"
 	"time"
 )
 
@@ -100,7 +99,7 @@ func runInstances(kv map[string]interface{}, taskName, widthOut, computer string
 	}
 	//log.Println(res)
 }
-var sys sync.RWMutex
+var CID = map[string]string{}
 
 //查询多台实例的详细信息
 func DescribeInstances() {
@@ -109,6 +108,7 @@ func DescribeInstances() {
 		[]string{"InstanceChargeType", "PostPaid"},
 		[]string{"PageSize", "100"},
 	})
+	CID = make(map[string]string)
 	for _, ins := range res["Instances"].(map[string]interface{}) {
 		for _, val := range ins.([]interface{}) {
 			if tmp, ok := val.(map[string]interface{}); ok {
@@ -124,11 +124,13 @@ func DescribeInstances() {
 					}
 				}
 				if strings.Contains(qu.ObjToString(tmp["InstanceName"]), "ocr_task") {
-					log.Println("更新申请实例",tmp["InstanceId"],"内网ip",tmp["ip_nw"])
+					log.Println("更新申请实例", tmp["InstanceId"], "内网ip", tmp["ip_nw"])
+					config.Sys.Lock()
+					CID[qu.ObjToString(tmp["InstanceId"])] = qu.ObjToString(tmp["ip_nw"])
+					config.Sys.Unlock()
 					//更新实例信息
 					mongodb.Update("ocr_ecs", `{"InstanceId":"`+qu.ObjToString(tmp["InstanceId"])+`"}`, map[string]interface{}{"$set": tmp}, true, false)
 					mongodb.Update("ocr_ecs_bak", `{"InstanceId":"`+qu.ObjToString(tmp["InstanceId"])+`"}`, map[string]interface{}{"$set": tmp}, true, false)
-
 				}
 			}
 		}

+ 0 - 23
udp_ocr_conter/src/config/config.go

@@ -1,20 +1,17 @@
 package config
 
 import (
-	"gopkg.in/mgo.v2/bson"
 	"log"
 	mu "mfw/util"
 	"qfw/util"
 	"qfw/util/mongodb"
 	"strings"
 	"sync"
-	"time"
 )
 
 var Udpclient mu.UdpClient //udp对象
 var Sysconfig map[string]interface{}
 var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
-var CID = map[string]string{}
 var Sys sync.RWMutex
 
 
@@ -33,25 +30,5 @@ func init() {
 	mongodb.InitMongodbPool(util.IntAllDef(Sysconfig["dbsize"], 5), MgoIP, MgoDB)
 
 	Udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
-	ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1, "InstanceId": 1,"ip_nw":1}, false, -1, -1)
-	if ocrescs != nil || len(*ocrescs) > 0 {
-		for _, v := range (*ocrescs) {
-			if util.ObjToString(v["AutoReleaseTime"]) != "" {
-				utc, err := time.ParseInLocation("2006-01-02T15:04Z", util.ObjToString(v["AutoReleaseTime"]), time.Local)
-				if err != nil {
-					log.Println("解析时间异常", err)
-					continue
-				}
-				if utc.After(time.Now()) {
-					if util.ObjToString(v["InstanceId"]) == ""{
-						continue
-					}
-					CID[ util.ObjToString(v["InstanceId"])] = util.ObjToString(v["ip_nw"])
-					//log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
-				}
-			}
-		}
-	}
-	log.Println(CID)
 
 }

+ 19 - 30
udp_ocr_conter/src/corntask/task_corn.go

@@ -38,17 +38,16 @@ var Auto = func() {
 	//if taskNum <= 0 {
 	//计算释放,发送udp
 	ccnum := compute()
-	log.Println("ccnum:", ccnum, ", len(config.CID):", len(config.CID), config.CID)
+	log.Println("ccnum:", ccnum, ", len(config.CID):", len(cluster.CID), cluster.CID)
 	if ccnum <= 0 {
-		if len(config.CID) == 0 {
-			log.Println("当前实例为空,无需释放", config.CID, )
+		if len(cluster.CID) == 0 {
+			log.Println("当前实例为空,无需释放", cluster.CID, )
 			return
 		}
-		if ccnum == 0 && len(config.CID) > 0 {
-			log.Println("释放所有实例", config.CID)
-			for tmpIid, _ := range config.CID {
+		if ccnum == 0 && len(cluster.CID) > 0 {
+			log.Println("释放所有实例", cluster.CID)
+			for tmpIid, _ := range cluster.CID {
 				ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
-				mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
 				log.Println("5分钟后释放实例", tmpIid)
 				config.Udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
 					IP:   net.ParseIP(util.ObjToString((*ttt)["ip_nw"])),
@@ -56,22 +55,19 @@ var Auto = func() {
 				})
 				go func(tmpIid string) {
 					time.Sleep(time.Minute * 5)
+					mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
 					cluster.DeleteInstance(tmpIid)
 					log.Println("5分钟后释放实例完成", tmpIid)
 				}(tmpIid)
 			}
-			config.Sys.Lock()
-			config.CID = make(map[string]string)
-			config.Sys.Unlock()
 		} else {
 			var tmpnum int
-			for k, _ := range config.CID {
+			for k, _ := range cluster.CID {
 				if ccnum >= tmpnum {
 					return
 				}
 				tmpIid := k
 				ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
-				mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
 				log.Println("5分钟后释放实例", tmpIid)
 				config.Udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
 					IP:   net.ParseIP(util.ObjToString((*ttt)["ip_nw"])),
@@ -79,21 +75,19 @@ var Auto = func() {
 				})
 				go func(tmpIid string) {
 					time.Sleep(time.Minute * 5)
+					mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
 					cluster.DeleteInstance(tmpIid)
 					log.Println("5分钟后释放实例完成", tmpIid)
 				}(tmpIid)
 				tmpnum--
-				config.Sys.Lock()
-				delete(config.CID, k)
-				config.Sys.Unlock()
 			}
-
 		}
+		cluster.DescribeInstances()
 		return
 	}
 	//}
-	if len(config.CID) >= util.IntAll(config.Sysconfig["pernum"]) {
-		log.Println("实例申请上限,当前实例:", config.CID)
+	if len(cluster.CID) >= util.IntAll(config.Sysconfig["pernum"]) {
+		log.Println("实例申请上限,当前实例:", cluster.CID)
 		escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
 		log.Println("实例未部署数量", len(*escObject))
 		if escObject != nil || len(*escObject) > 0 {
@@ -111,11 +105,6 @@ var Auto = func() {
 							if isok2 && isok3 {
 								(*escObject)[i]["OcrTaskStatus"] = "successful"
 								mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
-								config.Sys.Lock()
-								if config.CID["InstanceId"] == "" {
-									config.CID["InstanceId"] = tmpip
-								}
-								config.Sys.Unlock()
 								log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
 							} else {
 								log.Println(tmpip, "部署异常,"+tmpstr)
@@ -133,7 +122,7 @@ var Auto = func() {
 	log.Println("申请实例")
 	now := time.Now()
 	hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
-	cluster.RunInstances("ocr_task_arr", "8", "false", ccnum, int(math.Round(hours)))
+	cluster.RunInstances("ocr_task_arr", "8", "false", util.IntAll(config.Sysconfig["pernum"]) - len(cluster.CID), int(math.Round(hours)))
 	log.Println("实例申请成功")
 	DynamicTask() //动态任务
 	log.Println("申请实例结束")
@@ -148,7 +137,7 @@ func DoCMD(ip string) bool {
 	return cluster.RunSsh(ip)
 }
 func DynamicTask() {
-	time.Sleep(time.Second * 30)
+	time.Sleep(time.Second * 20)
 	cluster.DescribeInstances() //查询多台实例的详细信息
 	escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
 	log.Println("实例未部署数量", len(*escObject))
@@ -170,8 +159,8 @@ func DynamicTask() {
 							(*escObject)[i]["OcrTaskStatus"] = "successful"
 							mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
 							config.Sys.Lock()
-							if config.CID[InstanceId] == "" && InstanceId != "" {
-								config.CID[InstanceId] = tmpip
+							if cluster.CID[InstanceId] == "" && InstanceId != "" {
+								cluster.CID[InstanceId] = tmpip
 							}
 							config.Sys.Unlock()
 							log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
@@ -227,12 +216,12 @@ func compute() int {
 			log.Println(496, sum, "<=", util.IntAll(config.Sysconfig["accumulated_task_lowlimit"]))
 			return 0
 		}
-		mtmm := float64(overNum) / float64(nowtime-stime) / float64(len(config.CID)+3) //每台每秒
+		mtmm := float64(overNum) / float64(nowtime-stime) / float64(len(cluster.CID)+3) //每台每秒
 		if mtmm <= 0 {
-			log.Println(501, overNum, int(nowtime-stime), (len(config.CID) + 3))
+			log.Println(501, overNum, int(nowtime-stime), (len(cluster.CID) + 3))
 			return 0
 		}
-		cc := float64(sum)/float64(util.IntAll(config.Sysconfig["corntime_consuming"]))/mtmm - float64(len(config.CID)) - 3
+		cc := float64(sum)/float64(util.IntAll(config.Sysconfig["corntime_consuming"]))/mtmm - float64(len(cluster.CID)) - 3
 		log.Println("overNum:", overNum, ",hs:", int(nowtime-stime), ",mtms:", mtmm, ",sum:", sum, cc)
 		if cc > util.Float64All(config.Sysconfig["pernum"]) {
 			return util.IntAll(config.Sysconfig["pernum"])

+ 5 - 3
udp_ocr_conter/src/info/taskinfo.go

@@ -1,6 +1,7 @@
 package info
 
 import (
+	"cluster"
 	"config"
 	"fmt"
 	"gopkg.in/mgo.v2/bson"
@@ -56,12 +57,13 @@ func QueryInfo() map[string]interface{} {
 	since := time.Since(stime)
 	data["executionTime"] = since.String()
 	//log.Println(now.Sub(stime).Seconds())
-	mtmm := float64(overNum) / now.Sub(stime).Seconds() / float64(3+len(config.CID))
+	cluster.DescribeInstances()
+	mtmm := float64(overNum) / now.Sub(stime).Seconds() / float64(3+len(cluster.CID))
 	//log.Println(overNum ,now.Sub(stime).Seconds(),3+len(config.CID),mtmm)
-	tmpf := float64(nowSumNum) / mtmm / float64(3+len(config.CID))
+	tmpf := float64(nowSumNum) / mtmm / float64(3+len(cluster.CID))
 	//log.Println(nowSumNum,mtmm,tmpf)
 	data["estimatedFinishTime"] = now.Add(time.Second*time.Duration(tmpf)).String()
-	data["esc"] = fmt.Sprint(3 ,"+", len(config.CID))
+	data["esc"] = fmt.Sprint(3 ,"+", len(cluster.CID))
 	data["resultInfos"] = resultInfos
 	//log.Println(resultInfos)
 	return data