Răsfoiți Sursa

并发写入异常

fengweiqiang 5 ani în urmă
părinte
comite
350de5fad2

+ 11 - 4
udp_ocr_conter/main.go

@@ -41,12 +41,19 @@ func main() {
 			}))
 		}
 	}
-	for _, v := range cluster.CID {
-		log.Println("通知udp来取数据2:", v, config.Sysconfig["broadcast_port"], config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
-			IP:   net.ParseIP(v),
+	//for _, v := range cluster.CID {
+	//	log.Println("通知udp来取数据2:", v, config.Sysconfig["broadcast_port"], config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
+	//		IP:   net.ParseIP(v),
+	//		Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
+	//	}))
+	//}
+	cluster.CID.Range(func(key, value interface{}) bool {
+		log.Println("通知udp来取数据2:", value, config.Sysconfig["broadcast_port"], config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
+			IP:   net.ParseIP(qu.ObjToString(value)),
 			Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
 		}))
-	}
+		return true
+	})
 	mux := http.NewServeMux()
 	mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
 		http.Redirect(writer, request, "/login", http.StatusFound)

+ 8 - 5
udp_ocr_conter/src/cluster/aliecs.go

@@ -21,6 +21,7 @@ import (
 	"qfw/util/mongodb"
 	"sort"
 	"strings"
+	"sync"
 	"time"
 )
 
@@ -99,7 +100,8 @@ func runInstances(kv map[string]interface{}, taskName, widthOut, computer string
 	}
 	//log.Println(res)
 }
-var CID = map[string]string{}
+//var CID = map[string]string{}
+var CID sync.Map
 
 //查询多台实例的详细信息
 func DescribeInstances() {
@@ -108,7 +110,7 @@ func DescribeInstances() {
 		[]string{"InstanceChargeType", "PostPaid"},
 		[]string{"PageSize", "100"},
 	})
-	CID = make(map[string]string)
+	//CID = make(map[string]string)
 	for _, ins := range res["Instances"].(map[string]interface{}) {
 		for _, val := range ins.([]interface{}) {
 			if tmp, ok := val.(map[string]interface{}); ok {
@@ -125,9 +127,10 @@ func DescribeInstances() {
 				}
 				if strings.Contains(qu.ObjToString(tmp["InstanceName"]), "ocr_task") {
 					log.Println("更新申请实例", tmp["InstanceId"], "内网ip", tmp["ip_nw"])
-					config.Sys.Lock()
-					CID[qu.ObjToString(tmp["InstanceId"])] = qu.ObjToString(tmp["ip_nw"])
-					config.Sys.Unlock()
+					//config.Sys.Lock()
+					//CID[qu.ObjToString(tmp["InstanceId"])] = qu.ObjToString(tmp["ip_nw"])
+					//config.Sys.Unlock()
+					CID.Store(qu.ObjToString(tmp["InstanceId"]),qu.ObjToString(tmp["ip_nw"]))
 					//更新实例信息
 					mongodb.Update("ocr_ecs", `{"InstanceId":"`+qu.ObjToString(tmp["InstanceId"])+`"}`, map[string]interface{}{"$set": tmp}, true, false)
 					mongodb.Update("ocr_ecs_bak", `{"InstanceId":"`+qu.ObjToString(tmp["InstanceId"])+`"}`, map[string]interface{}{"$set": tmp}, true, false)

+ 1 - 1
udp_ocr_conter/src/config/send_email.go

@@ -41,7 +41,7 @@ func check() bool {
 	var sumNum int = 0
 	for _, v := range *taskArr {
 		sumNum += mongodb.Count("bidding", bson.M{"_id": bson.M{
-			"$gte": bson.ObjectIdHex(util.ObjToString(v["start"])),
+			"$gte": bson.ObjectIdHex(util.ObjToString(v["gtid"])),
 			"$lte": bson.ObjectIdHex(util.ObjToString(v[util.ObjToString(EidField)])),
 		}})
 	}

+ 98 - 20
udp_ocr_conter/src/corntask/task_corn.go

@@ -21,6 +21,12 @@ var Auto = func() {
 	}
 	nowday := time.Date(d.Year(), d.Month(), d.Day(), 0, 0, 0, 0, d.Location())
 	cluster.DescribeInstances() //查询多台实例的详细信息
+	tnum:=0
+	cluster.CID.Range(func(key, value interface{}) bool {
+		tnum +=1
+		return true
+	})
+	log.Println("实例数量:",tnum)
 	ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1}, false, -1, -1)
 	if ocrescs != nil || len(*ocrescs) > 0 {
 		for _, v := range (*ocrescs) {
@@ -44,9 +50,12 @@ var Auto = func() {
 	//if taskNum <= 0 {
 	//计算释放,发送udp
 	ccnum := compute()
-	log.Println("ccnum:", ccnum, ", len(config.CID):", len(cluster.CID), cluster.CID)
+	log.Println("ccnum:", ccnum, ", len(config.CID):", tnum, cluster.CID)
+	escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
+	log.Println("实例未部署数量", len(*escObject))
+	//释放实例
 	if ccnum <= 0 {
-		if len(cluster.CID) == 0 {
+		if tnum == 0 {
 			log.Println("当前实例为空,无需释放", cluster.CID, )
 			return
 		}
@@ -56,9 +65,9 @@ var Auto = func() {
 		xwa := time.Date(d.Year(), d.Month(), d.Day(), util.IntAll(config.Sysconfig["xwa"]), 0, 0, 0, d.Location())
 		xwb := time.Date(d.Year(), d.Month(), d.Day(), util.IntAll(config.Sysconfig["xwb"]), 0, 0, 0, d.Location())
 		if !((zsa.Before(d) && zsb.After(d)) || (xwa.Before(d) && xwb.After(d))){
-			if ccnum == 0 && len(cluster.CID) > 0 {
+			if ccnum == 0 && tnum > 0 {
 				log.Println("释放所有实例", cluster.CID)
-				for tmpIid, _ := range cluster.CID {
+				cluster.CID.Range(func(tmpIid, value interface{}) bool {
 					ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
 					log.Println("5分钟后释放实例", tmpIid)
 					go func(tmpIid string,ttt *map[string]interface{}) {
@@ -70,15 +79,49 @@ var Auto = func() {
 						mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
 						cluster.DeleteInstance(tmpIid)
 						log.Println("5分钟后释放实例完成", tmpIid)
-					}(tmpIid,ttt)
-				}
+					}(util.ObjToString(tmpIid),ttt)
+					return true
+				})
+				//for tmpIid, _ := range cluster.CID {
+				//	ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
+				//	log.Println("5分钟后释放实例", tmpIid)
+				//	go func(tmpIid string,ttt *map[string]interface{}) {
+				//		time.Sleep(time.Minute * 5)
+				//		config.Udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
+				//			IP:   net.ParseIP(util.ObjToString((*ttt)["ip_nw"])),
+				//			Port: util.IntAll(config.Sysconfig["broadcast_port"]),
+				//		})
+				//		mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
+				//		cluster.DeleteInstance(tmpIid)
+				//		log.Println("5分钟后释放实例完成", tmpIid)
+				//	}(tmpIid,ttt)
+				//}
 			} else {
 				var tmpnum int
-				for k, _ := range cluster.CID {
+				//for k, _ := range cluster.CID {
+				//	if ccnum >= tmpnum {
+				//		return
+				//	}
+				//	tmpIid := k
+				//	ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
+				//	log.Println("5分钟后释放实例", tmpIid)
+				//	go func(tmpIid string,ttt *map[string]interface{}) {
+				//		time.Sleep(time.Minute * 5)
+				//		config.Udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
+				//			IP:   net.ParseIP(util.ObjToString((*ttt)["ip_nw"])),
+				//			Port: util.IntAll(config.Sysconfig["broadcast_port"]),
+				//		})
+				//		mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
+				//		cluster.DeleteInstance(tmpIid)
+				//		log.Println("5分钟后释放实例完成", tmpIid)
+				//	}(tmpIid,ttt)
+				//	tmpnum--
+				//}
+				cluster.CID.Range(func(k, value interface{}) bool {
 					if ccnum >= tmpnum {
-						return
+						return false
 					}
-					tmpIid := k
+					tmpIid := util.ObjToString(k)
 					ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
 					log.Println("5分钟后释放实例", tmpIid)
 					go func(tmpIid string,ttt *map[string]interface{}) {
@@ -92,16 +135,46 @@ var Auto = func() {
 						log.Println("5分钟后释放实例完成", tmpIid)
 					}(tmpIid,ttt)
 					tmpnum--
-				}
+					return true
+				})
 			}
 		}
 		cluster.DescribeInstances()
+		escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
+		log.Println("实例未部署数量", len(*escObject))
+		if escObject != nil || len(*escObject) > 0 {
+			for i, v := range (*escObject) {
+				if tmpip := util.ObjToString(v["ip_nw"]); tmpip == "" {
+					log.Println("没用获取到ip,实例ip异常", tmpip)
+				} else {
+					go func(tmpip, InstanceId string, i int) {
+						if DoCMD(tmpip) {
+							var tmpstr string
+							isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
+							tmpstr += udpstr + ";&nbsp;&nbsp;"
+							isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text")
+							tmpstr += fil2textstr
+							if isok2 && isok3 {
+								(*escObject)[i]["OcrTaskStatus"] = "successful"
+								mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
+								log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
+							} else {
+								log.Println(tmpip, "部署异常,"+tmpstr)
+							}
+						} else {
+							log.Println(tmpip, "部署失败")
+						}
+					}(tmpip, util.ObjToString(v["InstanceId"]), i)
+				}
+			}
+		}
 		return
 	}
 	//}
-	if len(cluster.CID) >= util.IntAll(config.Sysconfig["pernum"]) {
+	//重新部署未完成实例
+	log.Println("tnum >= util.IntAll(config.Sysconfig[pernum])",tnum ,util.IntAll(config.Sysconfig["pernum"]))
+	if tnum >= util.IntAll(config.Sysconfig["pernum"]) {
 		log.Println("实例申请上限,当前实例:", cluster.CID)
-		escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
 		log.Println("实例未部署数量", len(*escObject))
 		if escObject != nil || len(*escObject) > 0 {
 			for i, v := range (*escObject) {
@@ -132,6 +205,7 @@ var Auto = func() {
 		return
 	}
 	//if taskNum > 1 {
+	//申请实例
 	log.Println("申请实例")
 	now := time.Now()
 	hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
@@ -150,7 +224,7 @@ func DoCMD(ip string) bool {
 	return cluster.RunSsh(ip)
 }
 func DynamicTask() {
-	time.Sleep(time.Second * 25)
+	time.Sleep(time.Second * 35)
 	cluster.DescribeInstances() //查询多台实例的详细信息
 	escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
 	log.Println("实例未部署数量", len(*escObject))
@@ -171,11 +245,14 @@ func DynamicTask() {
 						if isok2 && isok3 {
 							(*escObject)[i]["OcrTaskStatus"] = "successful"
 							mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
-							config.Sys.Lock()
-							if cluster.CID[InstanceId] == "" && InstanceId != "" {
-								cluster.CID[InstanceId] = tmpip
+							//config.Sys.Lock()
+							//if cluster.CID[InstanceId] == "" && InstanceId != "" {
+							//	cluster.CID[InstanceId] = tmpip
+							//}
+							if cv,ok :=cluster.CID.Load(InstanceId);ok && cv =="" &&InstanceId != ""{
+									cluster.CID.Store(InstanceId, tmpip)
 							}
-							config.Sys.Unlock()
+							//config.Sys.Unlock()
 							log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
 						} else {
 							log.Println(tmpip, "部署异常,"+tmpstr)
@@ -231,12 +308,13 @@ func compute() int {
 			log.Println(496, sum, "<=", util.IntAll(config.Sysconfig["accumulated_task_lowlimit"]))
 			return 0
 		}
-		mtmm := float64(overNum) / float64(nowtime-stime) / float64(len(cluster.CID)+util.IntAll(config.Sysconfig["gdts"])) //每台每秒
+		tnum :=mongodb.Count("ocr_ecs",bson.M{"OcrTaskStatus":"successful"})
+		mtmm := float64(overNum) / float64(nowtime-stime) / float64(tnum+util.IntAll(config.Sysconfig["gdts"])) //每台每秒
 		if mtmm <= 0 {
-			log.Println(501, overNum, int(nowtime-stime), (len(cluster.CID) + util.IntAll(config.Sysconfig["gdts"])))
+			log.Println(501, overNum, int(nowtime-stime), (tnum + util.IntAll(config.Sysconfig["gdts"])))
 			return 0
 		}
-		cc := float64(sum)/float64(util.IntAll(config.Sysconfig["corntime_consuming"]))/mtmm - float64(len(cluster.CID)) - float64(util.IntAll(config.Sysconfig["gdts"]))
+		cc := float64(sum)/float64(util.IntAll(config.Sysconfig["corntime_consuming"]))/mtmm - float64(tnum) - float64(util.IntAll(config.Sysconfig["gdts"]))
 		log.Println("overNum:", overNum, ",hs:", int(nowtime-stime), ",mtms:", mtmm, ",sum:", sum, cc)
 		if cc > util.Float64All(config.Sysconfig["pernum"]) {
 			return util.IntAll(config.Sysconfig["pernum"])

+ 10 - 5
udp_ocr_conter/src/info/taskinfo.go

@@ -13,9 +13,9 @@ import (
 func QueryInfo() map[string]interface{} {
 	now := time.Now()
 	data := make(map[string]interface{})
-	config.Sys.Lock()
+	//config.Sys.Lock()
 	//data["isrun"] = config.IsRun
-	config.Sys.Unlock()
+	//config.Sys.Unlock()
 	taskArr := mongodb.Find("ocr_task", bson.M{"isrun":bson.M{
 		"$ne":"stop",
 	}}, `{_id:1}`, nil, false, -1, -1)
@@ -63,12 +63,17 @@ func QueryInfo() map[string]interface{} {
 	data["executionTime"] = since.String()
 	//log.Println(now.Sub(stime).Seconds())
 	cluster.DescribeInstances()
-	mtmm := float64(overNum) / now.Sub(stime).Seconds() / float64(3+len(cluster.CID))
+	cnum:=0
+	cluster.CID.Range(func(key, value interface{}) bool {
+		cnum+=1
+		return true
+	})
+	mtmm := float64(overNum) / now.Sub(stime).Seconds() / float64(3+cnum)
 	//log.Println(overNum ,now.Sub(stime).Seconds(),3+len(config.CID),mtmm)
-	tmpf := float64(nowSumNum) / mtmm / float64(3+len(cluster.CID))
+	tmpf := float64(nowSumNum) / mtmm / float64(3+cnum)
 	//log.Println(nowSumNum,mtmm,tmpf)
 	data["estimatedFinishTime"] = now.Add(time.Second*time.Duration(tmpf)).String()
-	data["esc"] = fmt.Sprint(3 ,"+", len(cluster.CID))
+	data["esc"] = fmt.Sprint(3 ,"+", cnum)
 	data["resultInfos"] = resultInfos
 	//log.Println(resultInfos)
 	return data