fengweiqiang 5 éve
szülő
commit
3365fa3181

+ 102 - 40
udp_ocr_conter/main.go

@@ -22,12 +22,11 @@ import (
 )
 
 var udpclient mu.UdpClient //udp对象
-var auto bool
 
 var sys sync.RWMutex
+var DeleteNode = time.NewTimer(time.Minute * 50)
 
 func main() {
-	auto = false
 	udpclient = mu.UdpClient{Local: config.Sysconfig["udpip"].(string) + ":" + config.Sysconfig["udpport"].(string), BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Printf("Udp listening port: %s:%s\n", config.Sysconfig["udpip"], config.Sysconfig["udpport"])
@@ -142,13 +141,13 @@ func main() {
 			}
 			var tmpstr string
 			for _, v := range data {
-				if !regexp.MustCompile("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}").MatchString(v){
-					tmpstr+= v +"ip格式不正确</br>"
+				if !regexp.MustCompile("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}").MatchString(v) {
+					tmpstr += v + "ip格式不正确</br>"
 					continue
 				}
 				result := reload(v)
 				tmpstr += v + "---->" + result + "</br>"
-				log.Println( v, "重新部署完成", )
+				log.Println(v, "重新部署完成", )
 			}
 			tpReload.Execute(writer, template.HTML(
 				"</br><span style=\"color: red\">"+tmpstr+" 重新部署结束</span></br>"+
@@ -168,47 +167,66 @@ func main() {
 	c := cron.New()
 	spec := qu.ObjToString(config.Sysconfig["cornstr"])
 	c.AddFunc(spec, func() {
-		log.Println(168, auto)
-		if auto {
-			return
-		}
 		d := time.Now()
 		nowday := time.Date(d.Year(), d.Month(), d.Day(), 0, 0, 0, 0, d.Location())
 		taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
 		taskNum := len(*taskArr)
 		log.Println("当前任务数量:", taskNum)
-		if taskNum <= 1 {
+		if taskNum <= 0 {
 			return
 		}
-		if taskNum > 1 {
-			cluster.DescribeInstances() //查询多台实例的详细信息
-			ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1}, false, -1, -1)
-			if ocrescs != nil || len(*ocrescs) > 0 {
-				for _, v := range (*ocrescs) {
-					if qu.ObjToString(v["AutoReleaseTime"]) != "" {
-						utc, err := time.ParseInLocation("2006-01-02T15:04Z", qu.ObjToString(v["AutoReleaseTime"]),time.Local)
-						if err != nil {
-							log.Println("解析时间异常", err)
-							continue
-						}
-						if utc.Before(nowday) {
-							log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
-						}
+		//计算释放,发送udp
+		if !compute() {
+			if len(config.Cnum) <= 0 {
+				return
+			}
+			tmpIid := config.Cnum[0]
+			go func() {
+				DeleteNode.Reset(time.Minute * 50)
+				<-DeleteNode.C
+				config.Cnum = config.Cnum[1:]
+			}()
+			cluster.ModifyInstanceAutoReleaseTime(tmpIid, 1)
+			log.Println("一小时后释放实例", tmpIid)
+			go func() {
+				udpclient.WriteUdp([]byte("一小时后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
+					IP:   net.ParseIP(tmpIid),
+					Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
+				})
+			}()
+			return
+		}
+		if len(config.Cnum) >= qu.IntAll(config.Sysconfig["pernum"]) {
+			log.Println("实例申请上限,当前实例:", config.Cnum)
+			return
+		}
+		//if taskNum > 1 {
+		cluster.DescribeInstances() //查询多台实例的详细信息
+		ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1}, false, -1, -1)
+		if ocrescs != nil || len(*ocrescs) > 0 {
+			for _, v := range (*ocrescs) {
+				if qu.ObjToString(v["AutoReleaseTime"]) != "" {
+					utc, err := time.ParseInLocation("2006-01-02T15:04Z", qu.ObjToString(v["AutoReleaseTime"]), time.Local)
+					if err != nil {
+						log.Println("解析时间异常", err)
+						continue
+					}
+					if utc.Before(nowday) {
+						log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
 					}
 				}
 			}
-			log.Println("申请实例")
-			now := time.Now()
-			hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
-			cluster.RunInstances("ocr_task_arr", "8", "false", qu.IntAll(config.Sysconfig["pernum"]), int(math.Round(hours)))
-			log.Println("实例申请成功")
-			DynamicTask(&auto) //动态任务
-			log.Println("申请实例结束")
 		}
+		log.Println("申请实例")
+		now := time.Now()
+		hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
+		cluster.RunInstances("ocr_task_arr", "8", "false", 1, int(math.Round(hours)))
+		log.Println("实例申请成功")
+		DynamicTask() //动态任务
+		log.Println("申请实例结束")
+		//}
 	})
-	c.AddFunc("0 0 0 1/1 * ? ", func() {
-		auto = false
-	})
+
 	c.Start()
 
 	log.Println("Http  listening port: ", qu.ObjToString(config.Sysconfig["http_port"]))
@@ -229,8 +247,10 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			return
 		}
 		tmp["start"] = tmp[qu.ObjToString(config.SidField)]
+		tmp["import_time"] = time.Now().Unix()
 		bytes, _ := json.Marshal(tmp)
 		b := mongodb.Save("ocr_task", string(bytes))
+		mongodb.Save("ocr_task_bak", string(bytes))
 		log.Println("保存id:", b)
 	case mu.OP_NOOP: //其他节点回应消息打印
 		log.Println("节点接收成功", string(data), ra.String())
@@ -281,7 +301,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		//log.Println(newId.(bson.ObjectId).Hex())
 		tmp[config.SidField] = newId.(bson.ObjectId).Hex()
 		mongodb.Update("ocr_task",
-		bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false)
+			bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false)
 		sys.Unlock()
 	}
 }
@@ -322,8 +342,8 @@ func queryOcrTask() map[string]int {
 	return data
 }
 
-func DynamicTask(auto *bool) {
-	time.Sleep(time.Second*30)
+func DynamicTask() {
+	time.Sleep(time.Second * 30)
 	cluster.DescribeInstances() //查询多台实例的详细信息
 	escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
 	log.Println("实例未部署数量", len(*escObject))
@@ -331,7 +351,7 @@ func DynamicTask(auto *bool) {
 		for i, v := range (*escObject) {
 			if tmpip := qu.ObjToString(v["ip_nw"]); tmpip == "" {
 				log.Println("没用获取到ip,实例ip异常", tmpip)
-				DynamicTask(auto)
+				DynamicTask()
 			} else {
 				if DoCMD(tmpip) {
 					var tmpstr string
@@ -342,7 +362,7 @@ func DynamicTask(auto *bool) {
 					if isok2 && isok3 {
 						(*escObject)[i]["OcrTaskStatus"] = "successful"
 						mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
-						(*auto) = true
+						config.Cnum = append(config.Cnum, tmpip)
 						log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
 					} else {
 						log.Println(tmpip, "部署异常,"+tmpstr)
@@ -370,11 +390,17 @@ func reload(ip string) string {
 	if tmp == nil || len(*tmp) <= 0 {
 		return "ip不存在"
 	}
+	for i, v := range config.Cnum {
+		if v == ip {
+			config.Cnum = append(config.Cnum[:i], config.Cnum[i+1:]...)
+			break
+		}
+	}
 	cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例
 	now := time.Now()
 	hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
 	cluster.RunInstances("ocr_task_arr", "8", "false", 1, int(math.Round(hours))) //创建新实例
-	time.Sleep(time.Second*30)
+	time.Sleep(time.Second * 30)
 	cluster.DescribeInstances() //查询多台实例的详细信息
 	escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1)
 	if escObject != nil || len(*escObject) > 0 {
@@ -395,6 +421,7 @@ func reload(ip string) string {
 			if isok2 && isok3 {
 				(*escObject)[0]["OcrTaskStatus"] = "successful"
 				mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[0]["_id"]}, (*escObject)[0], true, false)
+				config.Cnum = append(config.Cnum, tmpip)
 				log.Println((*escObject)[0]["_id"], tmpip, "部署成功")
 				return tmpip + "部署成功," + tmpstr
 			} else {
@@ -406,3 +433,38 @@ func reload(ip string) string {
 	}
 	return "重新部署" + ip + "未知错误,查询多台实例的详细信息,没有查询到新创建实例"
 }
+
+func compute() bool {
+	nowtime := time.Now().Unix()
+	taskArrase := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
+	stmp := (*taskArrase)[0]
+	etmp := (*taskArrase)[len(*taskArrase)-1]
+	if stmp != nil && etmp != nil {
+		stime := qu.Int64All(stmp["import_time"])
+		if nowtime-stime <= 0 {
+			return false
+		}
+
+		sid := bson.ObjectIdHex(qu.ObjToString(stmp["start"]))
+		eid := bson.ObjectIdHex(qu.ObjToString(etmp[qu.ObjToString(config.EidField)]))
+		sum := mongodb.Count("bidding", bson.M{"_id": bson.M{
+			"$gte": sid,
+			"$lt":  eid,
+		}})
+
+		gteid := bson.ObjectIdHex(qu.ObjToString(stmp[qu.ObjToString(config.SidField)]))
+		overNum := mongodb.Count("bidding", bson.M{"_id": bson.M{
+			"$gte": sid,
+			"$lt":  gteid,
+		}})
+		log.Println("overNum:", overNum, ",hs:", int(nowtime-stime), ",sum:", sum)
+		if overNum == 0{
+			return false
+		}
+		if overNum/int(nowtime-stime)*300 >= sum {
+			return false
+		}
+		return true
+	}
+	return false
+}

+ 7 - 0
udp_ocr_conter/src/cluster/aliecs.go

@@ -88,6 +88,12 @@ func runInstances(kv map[string]interface{}, taskName, widthOut, computer string
 					"UseFor":        UseFor,
 					"OcrTaskStatus": "none",
 				})
+				mongodb.Save("ocr_ecs_bak", map[string]interface{}{
+					"InstanceId":    v,
+					"TaskName":      taskName,
+					"UseFor":        UseFor,
+					"OcrTaskStatus": "none",
+				})
 			}
 		}
 	}
@@ -119,6 +125,7 @@ func DescribeInstances() {
 					log.Println("更新申请实例",tmp["InstanceId"],"内网ip",tmp["ip_nw"])
 					//更新实例信息
 					mongodb.Update("ocr_ecs", `{"InstanceId":"`+qu.ObjToString(tmp["InstanceId"])+`"}`, map[string]interface{}{"$set": tmp}, true, false)
+					mongodb.Update("ocr_ecs_bak", `{"InstanceId":"`+qu.ObjToString(tmp["InstanceId"])+`"}`, map[string]interface{}{"$set": tmp}, true, false)
 				}
 			}
 		}

+ 2 - 2
udp_ocr_conter/src/cluster/ssh.go

@@ -97,8 +97,8 @@ cd /home/open-ocr/docker-compose/
 export OPEN_OCR_INSTANCE=open-ocr-2
 export RABBITMQ_HOST=` + ip + `
 docker-compose up -d
-docker pull registry-vpc.cn-beijing.aliyuncs.com/file2text/gocv:v2-binbash
-docker run --name f2text -itd -v /home/:/home/ --net host registry-vpc.cn-beijing.aliyuncs.com/file2text/gocv:v2-binbash /bin/bash
+docker pull registry-vpc.cn-beijing.aliyuncs.com/file2text/gocv:v2-binbash2
+docker run --name f2text -itd -v /home/:/home/ --net host registry-vpc.cn-beijing.aliyuncs.com/file2text/gocv:v2-binbash2 /bin/bash
 docker exec  f2text /etc/profile.d/auto.sh
 
 cd /home/udp/

+ 20 - 0
udp_ocr_conter/src/config/config.go

@@ -1,14 +1,17 @@
 package config
 
 import (
+	"gopkg.in/mgo.v2/bson"
 	"log"
 	"qfw/util"
 	"qfw/util/mongodb"
 	"strings"
+	"time"
 )
 
 var Sysconfig map[string]interface{}
 var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
+var Cnum []string
 func init() {
 	util.ReadConfig(&Sysconfig)
 	MgoIP = util.ObjToString(Sysconfig["mongodb_ip"])
@@ -22,4 +25,21 @@ func init() {
 		return
 	}
 	mongodb.InitMongodbPool(util.IntAllDef(Sysconfig["dbsize"], 5), MgoIP, MgoDB)
+
+	ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1,"ip_nw":1}, false, -1, -1)
+	if ocrescs != nil || len(*ocrescs) > 0 {
+		for _, v := range (*ocrescs) {
+			if util.ObjToString(v["AutoReleaseTime"]) != "" {
+				utc, err := time.ParseInLocation("2006-01-02T15:04Z", util.ObjToString(v["AutoReleaseTime"]), time.Local)
+				if err != nil {
+					log.Println("解析时间异常", err)
+					continue
+				}
+				if utc.After(time.Now()) {
+					Cnum = append(Cnum, util.ObjToString(v["ip_nw"]))
+					//log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
+				}
+			}
+		}
+	}
 }

+ 14 - 3
udpfileocr/main.go

@@ -68,7 +68,6 @@ task:
 
 //  "file2text": "192.168.3.207:1234",
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
-	defer qu.Catch()
 	switch act {
 	case mu.OP_TYPE_DATA:
 		atomic.AddInt32(&tmpNUM, 1)
@@ -91,6 +90,10 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 		if qu.ObjToString(mapInfo["permission"]) != "ocr_task" {
 			log.Println("数据异常 :", string(data), ra.String())
+			if qu.ObjToString(mapInfo["permission"]) == "stop"{
+				log.Println(mapInfo)
+				panic("释放实例")
+			}
 			time.Sleep(time.Second * 30)
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
 			return
@@ -105,7 +108,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		log.Println("获取数据成功:", mapInfo, ra.String())
 		data, _ := mongodbutil.Mgo.FindById(MgoC, ObjectId, bson.M{"_id": 1, MgoFileFiled: 1})
 		if len(*data) == 0 {
-			if qu.ObjToString(mapInfo["is_start"]) == "true"{
+			if qu.ObjToString(mapInfo["is_start"]) == "true" {
 				return
 			}
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
@@ -145,7 +148,15 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 	case mu.OP_NOOP: //下个节点回应
 		log.Println("接收成功", string(data))
-
+	case mu.OP_DELETE_DOWNLOADERCODES:
+		log.Println(string(data))
+		go func() {
+			time.Sleep(time.Minute * 50)
+			udpclient.WriteUdp([]byte(`{"permission":"stop"}`), mu.OP_TYPE_DATA, &net.UDPAddr{
+				IP:   net.ParseIP(Sysconfig["udpip"].(string)),
+				Port: qu.IntAll(Sysconfig["udpport"].(string)),
+			})
+		}()
 	}
 
 }