Browse Source

分发中心监控加上
1:显示当前任务已执行时间,预计完成时间
2:当前识别节点数量(固定服务器+竞价服务器)
3:积累任务列表信息

fengweiqiang 5 years ago
parent
commit
4bb0bba5b0

+ 28 - 297
udp_ocr_conter/main.go

@@ -3,11 +3,13 @@ package main
 import (
 	"cluster"
 	"config"
+	"corntask"
 	"encoding/json"
 	"fmt"
 	"github.com/cron"
 	"gopkg.in/mgo.v2/bson"
 	"html/template"
+	"info"
 	"log"
 	"math"
 	mu "mfw/util"
@@ -17,24 +19,19 @@ import (
 	"qfw/util/mongodb"
 	"regexp"
 	"strings"
-	"sync"
 	"time"
 )
 
-var udpclient mu.UdpClient //udp对象
-
-var sys sync.RWMutex
 var DeleteNode = time.NewTimer(time.Minute * 50)
 
 func main() {
-	udpclient = mu.UdpClient{Local: config.Sysconfig["udpip"].(string) + ":" + config.Sysconfig["udpport"].(string), BufSize: 1024}
-	udpclient.Listen(processUdpMsg)
+	config.Udpclient.Listen(processUdpMsg)
 	log.Printf("Udp listening port: %s:%s\n", config.Sysconfig["udpip"], config.Sysconfig["udpport"])
 	if config.Sysconfig["broadcast"].(bool) { //重启的话通知分布式节点
 		ips := qu.ObjToString(config.Sysconfig["broadcast_ips"])
 		ipsArr := strings.Split(ips, ";")
 		for _, v := range ipsArr {
-			udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
+			config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
 				IP:   net.ParseIP(v),
 				Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
 			})
@@ -104,7 +101,7 @@ func main() {
 			writer.Write([]byte( "页面找不到了~"))
 			return
 		}
-		task := queryOcrTask()
+		task := info.QueryInfo()
 		tp.Execute(writer, task)
 	})
 	mux.HandleFunc("/reload", func(writer http.ResponseWriter, request *http.Request) {
@@ -166,132 +163,7 @@ func main() {
 	})
 	c := cron.New()
 	spec := qu.ObjToString(config.Sysconfig["cornstr"])
-	c.AddFunc(spec, func() {
-		d := time.Now()
-		nowday := time.Date(d.Year(), d.Month(), d.Day(), 0, 0, 0, 0, d.Location())
-		cluster.DescribeInstances() //查询多台实例的详细信息
-		ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1}, false, -1, -1)
-		if ocrescs != nil || len(*ocrescs) > 0 {
-			for _, v := range (*ocrescs) {
-				if qu.ObjToString(v["AutoReleaseTime"]) != "" {
-					utc, err := time.ParseInLocation("2006-01-02T15:04Z", qu.ObjToString(v["AutoReleaseTime"]), time.Local)
-					if err != nil {
-						log.Println("解析时间异常", err)
-						continue
-					}
-					if utc.Before(nowday) {
-						log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
-					}
-				}
-			}
-		}
-		taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
-		taskNum := len(*taskArr)
-		log.Println("当前任务数量:", taskNum)
-		//if taskNum <= 0 {
-		//计算释放,发送udp
-		ccnum := compute()
-		log.Println("ccnum:", ccnum, ", len(config.CID):", len(config.CID), config.CID)
-		if ccnum <= 0 {
-			if len(config.CID) == 0 {
-				log.Println("当前实例为空,无需释放", config.CID, )
-				return
-			}
-			if ccnum == 0 && len(config.CID) > 0 {
-				log.Println("释放所有实例", config.CID)
-				for tmpIid, _ := range config.CID {
-					ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
-					mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
-					log.Println("5分钟后释放实例", tmpIid)
-					udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
-						IP:   net.ParseIP(qu.ObjToString((*ttt)["ip_nw"])),
-						Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
-					})
-					go func(tmpIid string) {
-						time.Sleep(time.Minute * 5)
-						cluster.DeleteInstance(tmpIid)
-						log.Println("5分钟后释放实例完成", tmpIid)
-					}(tmpIid)
-				}
-				sys.Lock()
-				config.CID = make(map[string]bool)
-				sys.Unlock()
-			} else {
-				var tmpnum int
-				for k, _ := range config.CID {
-					if ccnum*(-1) >= tmpnum {
-						return
-					}
-					tmpIid := k
-					ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
-					mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
-					log.Println("5分钟后释放实例", tmpIid)
-					udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
-						IP:   net.ParseIP(qu.ObjToString((*ttt)["ip_nw"])),
-						Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
-					})
-					go func(tmpIid string) {
-						time.Sleep(time.Minute * 5)
-						cluster.DeleteInstance(tmpIid)
-						log.Println("5分钟后释放实例完成", tmpIid)
-					}(tmpIid)
-					sys.Lock()
-					delete(config.CID, k)
-					sys.Unlock()
-				}
-
-			}
-			return
-		}
-		//}
-		if len(config.CID) >= qu.IntAll(config.Sysconfig["pernum"]) {
-			log.Println("实例申请上限,当前实例:", config.CID)
-			escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
-			log.Println("实例未部署数量", len(*escObject))
-			if escObject != nil || len(*escObject) > 0 {
-				for i, v := range (*escObject) {
-					if tmpip := qu.ObjToString(v["ip_nw"]); tmpip == "" {
-						log.Println("没用获取到ip,实例ip异常", tmpip)
-					} else {
-						go func(tmpip, InstanceId string, i int) {
-							if DoCMD(tmpip) {
-								var tmpstr string
-								isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
-								tmpstr += udpstr + ";&nbsp;&nbsp;"
-								isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text")
-								tmpstr += fil2textstr
-								if isok2 && isok3 {
-									(*escObject)[i]["OcrTaskStatus"] = "successful"
-									mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
-									sys.Lock()
-									if config.CID["InstanceId"] == false {
-										config.CID["InstanceId"] = true
-									}
-									sys.Unlock()
-									log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
-								} else {
-									log.Println(tmpip, "部署异常,"+tmpstr)
-								}
-							} else {
-								log.Println(tmpip, "部署失败")
-							}
-						}(tmpip, qu.ObjToString(v["InstanceId"]), i)
-					}
-				}
-			}
-			return
-		}
-		//if taskNum > 1 {
-		log.Println("申请实例")
-		now := time.Now()
-		hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
-		cluster.RunInstances("ocr_task_arr", "8", "false", ccnum, int(math.Round(hours)))
-		log.Println("实例申请成功")
-		DynamicTask() //动态任务
-		log.Println("申请实例结束")
-		//}
-	})
-
+	c.AddFunc(spec, corntask.Auto)
 	c.Start()
 
 	log.Println("Http  listening port: ", qu.ObjToString(config.Sysconfig["http_port"]))
@@ -304,11 +176,11 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	defer qu.Catch()
 	switch act {
 	case mu.OP_TYPE_DATA: //保存服务
-		go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra)
+		go config.Udpclient.WriteUdp([]byte("数据接收成功:"+string(data)), mu.OP_NOOP, ra)
 		tmp := make(map[string]interface{})
 		err := json.Unmarshal(data, &tmp)
 		if err != nil {
-			go udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra)
+			go config.Udpclient.WriteUdp([]byte("数据接收成功:"+string(data)+",data解析失败,"+err.Error()), mu.OP_NOOP, ra)
 			return
 		}
 		tmp["start"] = tmp[qu.ObjToString(config.SidField)]
@@ -322,14 +194,14 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	case mu.OP_GET_DOWNLOADERCODE: //分发任务
 		if `{"permission":"get_ocr_task"}` != string(data) {
 			log.Println("没有权限:", string(data), ra)
-			go udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra)
+			go config.Udpclient.WriteUdp([]byte("没有权限"), mu.OP_NOOP, ra)
 			return
 		}
-		sys.Lock()
+		config.Sys.Lock()
 		datas := mongodb.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
 		if len(*datas) == 0 {
-			go udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
-			sys.Unlock()
+			go config.Udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
+			config.Sys.Unlock()
 			return
 		}
 		tmp := (*datas)[0]
@@ -341,19 +213,19 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}}, `{"_id":1,"`+config.MgoFileFiled+`":1}`)
 		//log.Println(rdata)
 		if len((*rdata)) == 0 {
-			go udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra)
-			sys.Unlock()
+			go config.Udpclient.WriteUdp([]byte("没有获取到最新数据"), mu.OP_TYPE_DATA, ra)
+			config.Sys.Unlock()
 			return
 		}
 		newId := (*rdata)["_id"]
 		if newId.(bson.ObjectId).Hex() >= eid {
-			go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置
-			go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)                    //分发任务
+			go config.Udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置
+			go config.Udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)                    //分发任务
 			totmp := make(map[string]string)
 			totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
 			totmp["eid"] = qu.ObjToString(tmp[qu.ObjToString(config.EidField)])
 			tobyte, _ := json.Marshal(totmp)
-			go udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
+			go config.Udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
 				IP:   net.ParseIP(qu.ObjToString(config.Sysconfig["toudpip"])),
 				Port: qu.IntAll(config.Sysconfig["toudpport"]),
 			})
@@ -361,114 +233,28 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
 			tmp["end_time"] = time.Now().Unix()
 			mongodb.Update("ocr_task_bak", bson.M{"_id": ObjectId.(bson.ObjectId)}, map[string]interface{}{"$set": tmp}, true, false)
-			sys.Unlock()
+			config.Sys.Unlock()
 			return
 		}
-		go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
+		go config.Udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra) //分发任务
 		//log.Println(newId.(bson.ObjectId).Hex())
 		tmp[config.SidField] = newId.(bson.ObjectId).Hex()
 		mongodb.Update("ocr_task",
 			bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false)
-		sys.Unlock()
+		config.Sys.Unlock()
 	}
 }
-func queryOcrTask() map[string]int {
-	data := make(map[string]int)
-	taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
-	taskNum := len(*taskArr)
-	if taskNum == 0 {
-		return data
-	}
-	data["taskNum"] = taskNum
-	sumNum := 0
-	nowSumNum := 0
-	for i, v := range *taskArr {
-		sid := bson.ObjectIdHex(qu.ObjToString(v["start"]))
-		eid := bson.ObjectIdHex(qu.ObjToString(v[qu.ObjToString(config.EidField)]))
-		sumNum += mongodb.Count("bidding", bson.M{"_id": bson.M{
-			"$gte": sid,
-			"$lte": eid,
-		}})
-		if i == 0 {
-			nowSumNum = sumNum
-		}
-	}
-	data["sumNum"] = sumNum
-	data["nowSumNum"] = nowSumNum
-	tmpsid := bson.ObjectIdHex(qu.ObjToString((*taskArr)[0]["start"]))
-	over := (*taskArr)[0][qu.ObjToString(config.SidField)]
-	overNum := mongodb.Count("bidding", bson.M{"_id": bson.M{
-		"$gte": tmpsid,
-		"$lt":  bson.ObjectIdHex(qu.ObjToString(over)),
-	}})
-	data["overNum"] = overNum
-	undoneNum := sumNum - overNum
-	data["undoneNum"] = undoneNum
-	nowUnDoneNum := nowSumNum - overNum
-	data["nowUnDoneNum"] = nowUnDoneNum
-	return data
-}
-
-func DynamicTask() {
-	time.Sleep(time.Second * 30)
-	cluster.DescribeInstances() //查询多台实例的详细信息
-	escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
-	log.Println("实例未部署数量", len(*escObject))
-	if escObject != nil || len(*escObject) > 0 {
-		for i, v := range (*escObject) {
-			if tmpip := qu.ObjToString(v["ip_nw"]); tmpip == "" {
-				log.Println("没用获取到ip,实例ip异常", tmpip)
-				DynamicTask()
-				return
-			} else {
-				go func(tmpip,InstanceId string,i int) {
-					if DoCMD(tmpip) {
-						var tmpstr string
-						isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
-						tmpstr += udpstr + ";&nbsp;&nbsp;"
-						isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text")
-						tmpstr += fil2textstr
-						if isok2 && isok3 {
-							(*escObject)[i]["OcrTaskStatus"] = "successful"
-							mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
-							sys.Lock()
-							if config.CID[InstanceId] == false {
-								config.CID[InstanceId] = true
-							}
-							sys.Unlock()
-							log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
-						} else {
-							log.Println(tmpip, "部署异常,"+tmpstr)
-						}
-					} else {
-						log.Println(tmpip, "部署失败")
-					}
-				}(tmpip,qu.ObjToString(v["InstanceId"]),i)
-			}
-		}
-	} else {
-		log.Println("动态任务创建失败")
-	}
-}
-
-func DoCMD(ip string) bool {
-	if ip == "" {
-		return false
-	}
-
-	return cluster.RunSsh(ip)
-}
 
 func reload(ip string) string {
 	tmp := mongodb.FindOne("ocr_ecs", bson.M{"ip_nw": ip})
 	if tmp == nil || len(*tmp) <= 0 {
 		return "ip不存在"
 	}
-	sys.Lock()
-	if config.CID[qu.ObjToString((*tmp)["InstanceId"])] == true{
-		delete(config.CID,qu.ObjToString((*tmp)["InstanceId"]))
+	config.Sys.Lock()
+	if config.CID[qu.ObjToString((*tmp)["InstanceId"])] == true {
+		delete(config.CID, qu.ObjToString((*tmp)["InstanceId"]))
 	}
-	sys.Unlock()
+	config.Sys.Unlock()
 	cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例
 	now := time.Now()
 	hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
@@ -485,7 +271,7 @@ func reload(ip string) string {
 			escObject = mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, true, -1, -1)
 			tmpip = qu.ObjToString((*escObject)[0]["ip_nw"])
 		}
-		if DoCMD(tmpip) {
+		if corntask.DoCMD(tmpip) {
 			var tmpstr string
 			isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
 			tmpstr += udpstr + ";&nbsp;&nbsp;"
@@ -494,11 +280,11 @@ func reload(ip string) string {
 			if isok2 && isok3 {
 				(*escObject)[0]["OcrTaskStatus"] = "successful"
 				mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[0]["_id"]}, (*escObject)[0], true, false)
-				sys.Lock()
-				if config.CID[qu.ObjToString((*escObject)[0]["InstanceId"])] == false {
+				config.Sys.Lock()
+				if config.CID[qu.ObjToString((*escObject)[0]["InstanceId"])] == false && qu.ObjToString((*escObject)[0]["InstanceId"]) !=""{
 					config.CID[qu.ObjToString((*escObject)[0]["InstanceId"])] = true
 				}
-				sys.Unlock()
+				config.Sys.Unlock()
 				log.Println((*escObject)[0]["_id"], tmpip, "部署成功")
 				return tmpip + "部署成功," + tmpstr
 			} else {
@@ -510,58 +296,3 @@ func reload(ip string) string {
 	}
 	return "重新部署" + ip + "未知错误,查询多台实例的详细信息,没有查询到新创建实例"
 }
-
-func compute() int {
-	nowtime := time.Now().Unix()
-	taskArrase := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
-	if taskArrase == nil || len(*taskArrase) == 0 {
-		log.Println(464, "nil ro len(*taskArrase) == 0")
-		return 0
-	}
-	stmp := (*taskArrase)[0]
-	etmp := (*taskArrase)[len(*taskArrase)-1]
-	if stmp != nil && etmp != nil {
-		stime := qu.Int64All(stmp["import_time"])
-		if nowtime-stime <= qu.Int64All(config.Sysconfig["time_consuming_limit"]) {
-			log.Println(472, nowtime-stime, "<=", qu.Int64All(config.Sysconfig["time_consuming_limit"]))
-			return 0
-		}
-
-		sid := bson.ObjectIdHex(qu.ObjToString(stmp["start"]))
-		eid := bson.ObjectIdHex(qu.ObjToString(etmp[qu.ObjToString(config.EidField)]))
-		sum := mongodb.Count("bidding", bson.M{"_id": bson.M{
-			"$gte": sid,
-			"$lt":  eid,
-		}})
-		if sum <= qu.IntAll(config.Sysconfig["accumulated_task_limit"]) {
-			log.Println(483, sum, sid, eid)
-			return 0
-		}
-		gteid := bson.ObjectIdHex(qu.ObjToString(stmp[qu.ObjToString(config.SidField)]))
-		overNum := mongodb.Count("bidding", bson.M{"_id": bson.M{
-			"$gte": sid,
-			"$lt":  gteid,
-		}})
-		if overNum == 0 {
-			log.Println(492, overNum, sid, gteid)
-			return 0
-		}
-		if sum <= qu.IntAll(config.Sysconfig["accumulated_task_lowlimit"]) {
-			log.Println(496, sum, "<=", qu.IntAll(config.Sysconfig["accumulated_task_lowlimit"]))
-			return 0
-		}
-		mtmm := float64(overNum) / float64(nowtime-stime) / float64(len(config.CID)+3) //每台每秒
-		if mtmm <= 0 {
-			log.Println(501, overNum, int(nowtime-stime), (len(config.CID) + 3))
-			return 0
-		}
-		cc := float64(sum)/float64(qu.IntAll(config.Sysconfig["corntime_consuming"]))/mtmm - float64(len(config.CID)) - 3
-		log.Println("overNum:", overNum, ",hs:", int(nowtime-stime), ",mtms:", mtmm, ",sum:", sum, cc)
-		if cc > qu.Float64All(config.Sysconfig["pernum"]) {
-			return qu.IntAll(config.Sysconfig["pernum"])
-		} else {
-			return int(cc)
-		}
-	}
-	return 0
-}

+ 10 - 0
udp_ocr_conter/src/config/config.go

@@ -6,12 +6,17 @@ import (
 	"qfw/util"
 	"qfw/util/mongodb"
 	"strings"
+	"sync"
 	"time"
+	mu "mfw/util"
 )
 
+var Udpclient mu.UdpClient //udp对象
 var Sysconfig map[string]interface{}
 var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
 var CID = map[string]bool{}
+var Sys sync.RWMutex
+
 
 func init() {
 	util.ReadConfig(&Sysconfig)
@@ -37,10 +42,15 @@ func init() {
 					continue
 				}
 				if utc.After(time.Now()) {
+					if util.ObjToString(v["InstanceId"]) == ""{
+						continue
+					}
 					CID[ util.ObjToString(v["InstanceId"])] = true
 					//log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
 				}
 			}
 		}
 	}
+	log.Println(CID)
+	Udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
 }

+ 244 - 0
udp_ocr_conter/src/corntask/task_corn.go

@@ -0,0 +1,244 @@
+package corntask
+
+import (
+	"cluster"
+	"config"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	"math"
+	mu "mfw/util"
+	"net"
+	"qfw/util"
+	"qfw/util/mongodb"
+	"time"
+)
+
+var Auto = func() {
+	d := time.Now()
+	nowday := time.Date(d.Year(), d.Month(), d.Day(), 0, 0, 0, 0, d.Location())
+	cluster.DescribeInstances() //查询多台实例的详细信息
+	ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1}, false, -1, -1)
+	if ocrescs != nil || len(*ocrescs) > 0 {
+		for _, v := range (*ocrescs) {
+			if util.ObjToString(v["AutoReleaseTime"]) != "" {
+				utc, err := time.ParseInLocation("2006-01-02T15:04Z", util.ObjToString(v["AutoReleaseTime"]), time.Local)
+				if err != nil {
+					log.Println("解析时间异常", err)
+					continue
+				}
+				if utc.Before(nowday) {
+					log.Println("删除过时实例", mongodb.Del("ocr_ecs", bson.M{"_id": v["_id"].(bson.ObjectId)}))
+				}
+			}
+		}
+	}
+	taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
+	taskNum := len(*taskArr)
+	log.Println("当前任务数量:", taskNum)
+	//if taskNum <= 0 {
+	//计算释放,发送udp
+	ccnum := compute()
+	log.Println("ccnum:", ccnum, ", len(config.CID):", len(config.CID), config.CID)
+	if ccnum <= 0 {
+		if len(config.CID) == 0 {
+			log.Println("当前实例为空,无需释放", config.CID, )
+			return
+		}
+		if ccnum == 0 && len(config.CID) > 0 {
+			log.Println("释放所有实例", config.CID)
+			for tmpIid, _ := range config.CID {
+				ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
+				mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
+				log.Println("5分钟后释放实例", tmpIid)
+				config.Udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
+					IP:   net.ParseIP(util.ObjToString((*ttt)["ip_nw"])),
+					Port: util.IntAll(config.Sysconfig["broadcast_port"]),
+				})
+				go func(tmpIid string) {
+					time.Sleep(time.Minute * 5)
+					cluster.DeleteInstance(tmpIid)
+					log.Println("5分钟后释放实例完成", tmpIid)
+				}(tmpIid)
+			}
+			config.Sys.Lock()
+			config.CID = make(map[string]bool)
+			config.Sys.Unlock()
+		} else {
+			var tmpnum int
+			for k, _ := range config.CID {
+				if ccnum >= tmpnum {
+					return
+				}
+				tmpIid := k
+				ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
+				mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
+				log.Println("5分钟后释放实例", tmpIid)
+				config.Udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
+					IP:   net.ParseIP(util.ObjToString((*ttt)["ip_nw"])),
+					Port: util.IntAll(config.Sysconfig["broadcast_port"]),
+				})
+				go func(tmpIid string) {
+					time.Sleep(time.Minute * 5)
+					cluster.DeleteInstance(tmpIid)
+					log.Println("5分钟后释放实例完成", tmpIid)
+				}(tmpIid)
+				tmpnum--
+				config.Sys.Lock()
+				delete(config.CID, k)
+				config.Sys.Unlock()
+			}
+
+		}
+		return
+	}
+	//}
+	if len(config.CID) >= util.IntAll(config.Sysconfig["pernum"]) {
+		log.Println("实例申请上限,当前实例:", config.CID)
+		escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
+		log.Println("实例未部署数量", len(*escObject))
+		if escObject != nil || len(*escObject) > 0 {
+			for i, v := range (*escObject) {
+				if tmpip := util.ObjToString(v["ip_nw"]); tmpip == "" {
+					log.Println("没用获取到ip,实例ip异常", tmpip)
+				} else {
+					go func(tmpip, InstanceId string, i int) {
+						if DoCMD(tmpip) {
+							var tmpstr string
+							isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
+							tmpstr += udpstr + ";&nbsp;&nbsp;"
+							isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text")
+							tmpstr += fil2textstr
+							if isok2 && isok3 {
+								(*escObject)[i]["OcrTaskStatus"] = "successful"
+								mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
+								config.Sys.Lock()
+								if config.CID["InstanceId"] == false {
+									config.CID["InstanceId"] = true
+								}
+								config.Sys.Unlock()
+								log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
+							} else {
+								log.Println(tmpip, "部署异常,"+tmpstr)
+							}
+						} else {
+							log.Println(tmpip, "部署失败")
+						}
+					}(tmpip, util.ObjToString(v["InstanceId"]), i)
+				}
+			}
+		}
+		return
+	}
+	//if taskNum > 1 {
+	log.Println("申请实例")
+	now := time.Now()
+	hours := time.Date(now.Year(), now.Month(), now.Day(), 20, 0, 0, 0, now.Location()).Sub(now).Hours()
+	cluster.RunInstances("ocr_task_arr", "8", "false", ccnum, int(math.Round(hours)))
+	log.Println("实例申请成功")
+	DynamicTask() //动态任务
+	log.Println("申请实例结束")
+	//}
+}
+
+func DoCMD(ip string) bool {
+	if ip == "" {
+		return false
+	}
+
+	return cluster.RunSsh(ip)
+}
+func DynamicTask() {
+	time.Sleep(time.Second * 30)
+	cluster.DescribeInstances() //查询多台实例的详细信息
+	escObject := mongodb.Find("ocr_ecs", bson.M{"TaskName": "ocr_task_arr", "OcrTaskStatus": "none"}, bson.M{"_id": -1}, nil, false, -1, -1)
+	log.Println("实例未部署数量", len(*escObject))
+	if escObject != nil || len(*escObject) > 0 {
+		for i, v := range (*escObject) {
+			if tmpip := util.ObjToString(v["ip_nw"]); tmpip == "" {
+				log.Println("没用获取到ip,实例ip异常", tmpip)
+				DynamicTask()
+				return
+			} else {
+				go func(tmpip, InstanceId string, i int) {
+					if DoCMD(tmpip) {
+						var tmpstr string
+						isok2, udpstr := cluster.SshPgrep(tmpip, "pgrep udp2019")
+						tmpstr += udpstr + ";&nbsp;&nbsp;"
+						isok3, fil2textstr := cluster.SshPgrep(tmpip, "pgrep file2text")
+						tmpstr += fil2textstr
+						if isok2 && isok3 {
+							(*escObject)[i]["OcrTaskStatus"] = "successful"
+							mongodb.Update("ocr_ecs", bson.M{"_id": (*escObject)[i]["_id"]}, (*escObject)[i], true, false)
+							config.Sys.Lock()
+							if config.CID[InstanceId] == false && InstanceId != "" {
+								config.CID[InstanceId] = true
+							}
+							config.Sys.Unlock()
+							log.Println((*escObject)[i]["_id"], tmpip, "部署成功")
+						} else {
+							log.Println(tmpip, "部署异常,"+tmpstr)
+						}
+					} else {
+						log.Println(tmpip, "部署失败")
+					}
+				}(tmpip, util.ObjToString(v["InstanceId"]), i)
+			}
+		}
+	} else {
+		log.Println("动态任务创建失败")
+	}
+}
+func compute() int {
+	nowtime := time.Now().Unix()
+	taskArrase := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
+	if taskArrase == nil || len(*taskArrase) == 0 {
+		log.Println(464, "nil ro len(*taskArrase) == 0")
+		return 0
+	}
+	stmp := (*taskArrase)[0]
+	etmp := (*taskArrase)[len(*taskArrase)-1]
+	if stmp != nil && etmp != nil {
+		stime := util.Int64All(stmp["import_time"])
+		if nowtime-stime <= util.Int64All(config.Sysconfig["time_consuming_limit"]) {
+			log.Println(472, nowtime-stime, "<=", util.Int64All(config.Sysconfig["time_consuming_limit"]))
+			return 0
+		}
+
+		sid := bson.ObjectIdHex(util.ObjToString(stmp["start"]))
+		eid := bson.ObjectIdHex(util.ObjToString(etmp[util.ObjToString(config.EidField)]))
+		sum := mongodb.Count("bidding", bson.M{"_id": bson.M{
+			"$gte": sid,
+			"$lt":  eid,
+		}})
+		if sum <= util.IntAll(config.Sysconfig["accumulated_task_limit"]) {
+			log.Println(483, sum, sid, eid)
+			return 0
+		}
+		gteid := bson.ObjectIdHex(util.ObjToString(stmp[util.ObjToString(config.SidField)]))
+		overNum := mongodb.Count("bidding", bson.M{"_id": bson.M{
+			"$gte": sid,
+			"$lt":  gteid,
+		}})
+		if overNum == 0 {
+			log.Println(492, overNum, sid, gteid)
+			return 0
+		}
+		if sum <= util.IntAll(config.Sysconfig["accumulated_task_lowlimit"]) {
+			log.Println(496, sum, "<=", util.IntAll(config.Sysconfig["accumulated_task_lowlimit"]))
+			return 0
+		}
+		mtmm := float64(overNum) / float64(nowtime-stime) / float64(len(config.CID)+3) //每台每秒
+		if mtmm <= 0 {
+			log.Println(501, overNum, int(nowtime-stime), (len(config.CID) + 3))
+			return 0
+		}
+		cc := float64(sum)/float64(util.IntAll(config.Sysconfig["corntime_consuming"]))/mtmm - float64(len(config.CID)) - 3
+		log.Println("overNum:", overNum, ",hs:", int(nowtime-stime), ",mtms:", mtmm, ",sum:", sum, cc)
+		if cc > util.Float64All(config.Sysconfig["pernum"]) {
+			return util.IntAll(config.Sysconfig["pernum"])
+		} else {
+			return int(cc)
+		}
+	}
+	return 0
+}

+ 69 - 0
udp_ocr_conter/src/info/taskinfo.go

@@ -0,0 +1,69 @@
+package info
+
+import (
+	"config"
+	"fmt"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	"qfw/util"
+	"qfw/util/mongodb"
+	"time"
+)
+
+func QueryInfo() map[string]interface{} {
+	now := time.Now()
+	data := make(map[string]interface{})
+	taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
+	taskNum := len(*taskArr)
+	if taskNum == 0 {
+		return data
+	}
+	data["taskNum"] = taskNum
+	sumNum := 0
+	nowSumNum := 0
+	var stime time.Time
+	resultInfos := make([]map[string]interface{}, 0)
+	for i, v := range *taskArr {
+		sid := bson.ObjectIdHex(util.ObjToString(v["start"]))
+		eid := bson.ObjectIdHex(util.ObjToString(v[util.ObjToString(config.EidField)]))
+		tmpSumNum := mongodb.Count("bidding", bson.M{"_id": bson.M{
+			"$gte": sid,
+			"$lte": eid,
+		}})
+		v["sub"] = tmpSumNum
+		v["i_time"] = time.Unix(util.Int64All(v["import_time"]),0).String()
+		resultInfos = append(resultInfos, v)
+		sumNum += tmpSumNum
+		if i == 0 {
+			nowSumNum = sumNum
+			tmptime := util.Int64All(v["import_time"])
+			stime = time.Unix(tmptime,0)
+		}
+	}
+	data["sumNum"] = sumNum
+	data["nowSumNum"] = nowSumNum
+	tmpsid := bson.ObjectIdHex(util.ObjToString((*taskArr)[0]["start"]))
+	over := (*taskArr)[0][util.ObjToString(config.SidField)]
+	overNum := mongodb.Count("bidding", bson.M{"_id": bson.M{
+		"$gte": tmpsid,
+		"$lt":  bson.ObjectIdHex(util.ObjToString(over)),
+	}})
+	data["overNum"] = overNum
+	undoneNum := sumNum - overNum
+	data["undoneNum"] = undoneNum
+	nowUnDoneNum := nowSumNum - overNum
+	data["nowUnDoneNum"] = nowUnDoneNum
+
+	since := time.Since(stime)
+	data["executionTime"] = since.String()
+	log.Println(now.Sub(stime).Seconds())
+	mtmm := float64(overNum) / now.Sub(stime).Seconds() / float64(3+len(config.CID))
+	//log.Println(overNum ,now.Sub(stime).Seconds(),3+len(config.CID),mtmm)
+	tmpf := float64(nowSumNum) / mtmm / float64(3+len(config.CID))
+	//log.Println(nowSumNum,mtmm,tmpf)
+	data["estimatedFinishTime"] = now.Add(time.Second*time.Duration(tmpf)).String()
+	data["esc"] = fmt.Sprint(3 ,"+", len(config.CID))
+	data["resultInfos"] = resultInfos
+	log.Println(resultInfos)
+	return data
+}

+ 67 - 29
udp_ocr_conter/src/web/index.html

@@ -6,45 +6,83 @@ ocr_task
 {{/*{{.}}*/}}
 <br>
 {{if eq (len .) 0}}
-当前无任务
+    当前无任务
 {{end}}
 {{if gt (len .)  0}}
+    <table>
+        <thead>
+        <tr>
+            <td>当前任务个数</td>
+            <td>总数量</td>
+            <td>完成数量</td>
+            <td>未识别全部数量</td>
+        </tr>
+        </thead>
+        <tbody>
+        <tr>
+            <td>{{.taskNum}}</td>
+            <td>{{.sumNum}}</td>
+            <td>{{.overNum}}</td>
+            <td>{{.undoneNum}}</td>
+        </tr>
+        </tbody>
+    </table>
+    <br>
+    <br>
+    当前任务进度
+    <br>
+    <table>
+        <thead>
+        <tr>
+            <td>总数量</td>
+            <td>完成数量</td>
+            <td>未识别数量</td>
+            <td>已执行时间</td>
+            <td>预计完成时间</td>
+            <td>固定服务器+竞价服务器</td>
+        </tr>
+        </thead>
+        <tbody>
+        <tr>
+            <td>{{.nowSumNum}}</td>
+            <td>{{.overNum}}</td>
+            <td>{{.nowUnDoneNum}}</td>
+            <td>{{.executionTime}}</td>
+            <td>{{.estimatedFinishTime}}</td>
+            <td>{{.esc}}</td>
+        </tr>
+        </tbody>
+    </table>
+{{end}}
+<br>
+任务详情
+<br>
 <table>
     <thead>
     <tr>
-        <td>当前任务个数</td>
-        <td>总数量</td>
-        <td>完成数量</td>
-        <td>未识别全部数量</td>
-    </tr>
-    </thead>
-    <tbody>
-    <tr>
-        <td>{{.taskNum}}</td>
-        <td>{{.sumNum}}</td>
-        <td>{{.overNum}}</td>
-        <td>{{.undoneNum}}</td>
-    </tr>
-    </tbody>
-</table>
-当前任务进度
-<table>
-    <thead>
-    <tr>
-        <td>总数量</td>
-        <td>完成数量</td>
-        <td>未识别数量</td>
+        <td>任务id</td>
+        <td>开始id</td>
+        <td>执行到id</td>
+        <td>执行结束id</td>
+        <td>入库时间</td>
+        <td>id段条数</td>
     </tr>
     </thead>
     <tbody>
-    <tr>
-        <td>{{.nowSumNum}}</td>
-        <td>{{.overNum}}</td>
-        <td>{{.nowUnDoneNum}}</td>
-    </tr>
+    {{range $key, $value := .resultInfos }}
+        <tr>
+                <td>{{$value._id}}</td>
+                <td>{{$value.start}}</td>
+                <td>{{$value.gtid}}</td>
+                <td>{{$value.lteid}}</td>
+                <td>{{$value.i_time}}</td>
+                <td>{{$value.sub}}</td>
+        </tr>
+    {{end}}
     </tbody>
 </table>
-{{end}}
+
+
 <input type="button" onclick="location.reload()" value="刷新"></input>
 
 </body>