Ver código fonte

Merge branch 'dev3.2' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.2

zhangjinkun 5 anos atrás
pai
commit
7a4850ec59

+ 7 - 2
udp_ocr_conter/config.json

@@ -14,11 +14,16 @@
   "broadcast_ips": "127.0.0.1;192.168.1.2;192.168.1.3;192.168.1.4",
   "broadcast_port": 1490,
   "http_port": "12345",
-  "cornstr": "0 0 0 1 1 ? ",
-  "corntime_consuming": 300,
+  "cornstr": "0 0 0 1 1 ?",
+  "corntime_consuming": 9000,
   "pernum": 5,
   "time_consuming_limit": 60,
   "accumulated_task_lowlimit": 1000,
+  "swa":119,
+  "swb":112,
+  "xwa":114,
+  "xwb":117,
+  "gdts": 113,
   "esconfig": {
     "available": true,
     "AccessID": "LTAIkuomMLAjIlGH",

+ 124 - 23
udp_ocr_conter/main.go

@@ -6,15 +6,16 @@ import (
 	"corntask"
 	"encoding/json"
 	"fmt"
-	"github.com/cron"
 	"gopkg.in/mgo.v2/bson"
 	"html/template"
 	"info"
+	"io/ioutil"
 	"log"
 	"math"
 	mu "mfw/util"
 	"net"
 	"net/http"
+	"qfw/common/src/github.com/robfig/cron"
 	qu "qfw/util"
 	"qfw/util/mongodb"
 	"regexp"
@@ -24,7 +25,7 @@ import (
 
 func init() {
 	cluster.DescribeInstances()
-	log.Println("初始化CID:",cluster.CID)
+	log.Println("初始化CID:", cluster.CID)
 }
 
 func main() {
@@ -34,14 +35,14 @@ func main() {
 		ips := qu.ObjToString(config.Sysconfig["broadcast_ips"])
 		ipsArr := strings.Split(ips, ";")
 		for _, v := range ipsArr {
-			log.Println("通知udp来取数据1:",v,config.Sysconfig["broadcast_port"],config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
+			log.Println("通知udp来取数据1:", v, config.Sysconfig["broadcast_port"], config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
 				IP:   net.ParseIP(v),
 				Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
 			}))
 		}
 	}
-	for _,v:= range cluster.CID{
-		log.Println("通知udp来取数据2:",v,config.Sysconfig["broadcast_port"],config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
+	for _, v := range cluster.CID {
+		log.Println("通知udp来取数据2:", v, config.Sysconfig["broadcast_port"], config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
 			IP:   net.ParseIP(v),
 			Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
 		}))
@@ -53,6 +54,43 @@ func main() {
 	mux.HandleFunc("/favicon.ico", func(writer http.ResponseWriter, request *http.Request) {
 		writer.WriteHeader(http.StatusOK)
 	})
+	//mux.HandleFunc("/udpkz", func(writer http.ResponseWriter, request *http.Request) {
+	//	tplogin, err := template.ParseFiles("src/web/login.html")
+	//	if err != nil {
+	//		log.Println(err)
+	//		writer.Write([]byte( "页面找不到了~"))
+	//		return
+	//	}
+	//	cookie, _ := request.Cookie("username")
+	//	if cookie == nil {
+	//		http.RedirectHandler("/login", http.StatusUnauthorized)
+	//		tplogin.Execute(writer, "请先登录")
+	//		return
+	//	}
+	//	if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc" {
+	//		http.RedirectHandler("/login", http.StatusUnauthorized)
+	//		tplogin.Execute(writer, "密钥错误")
+	//		return
+	//	}
+	//	err = request.ParseForm()
+	//	if err !=nil{
+	//		writer.WriteHeader(http.StatusBadGateway)
+	//		writer.Write([]byte(err.Error()))
+	//		return
+	//	}
+	//	data := request.Form.Get("do")
+	//	//log.Println(data)
+	//	if data == "start"{
+	//		config.Sys.Lock()
+	//		config.IsRun = true
+	//		config.Sys.Unlock()
+	//	}else if data =="stop"{
+	//		config.Sys.Lock()
+	//		config.IsRun = false
+	//		config.Sys.Unlock()
+	//	}
+	//	writer.WriteHeader(http.StatusOK)
+	//})
 	mux.HandleFunc("/login", func(writer http.ResponseWriter, request *http.Request) {
 		tp, err := template.ParseFiles("src/web/login.html")
 		if err != nil {
@@ -172,8 +210,50 @@ func main() {
 	})
 	c := cron.New()
 	spec := qu.ObjToString(config.Sysconfig["cornstr"])
-	c.AddFunc(spec, corntask.Auto)
 	c.Start()
+	c.AddFunc(spec, corntask.Auto)
+	c.AddFunc("0 0 6 1/1 * ?", func() {
+		config.Isjjr = true
+		nowtime := time.Now().Format("20060102")
+		r, err := http.Get("http://tool.bitefu.net/jiari/?d=" + nowtime + "&back=json")
+		defer r.Body.Close()
+		if err != nil {
+			log.Println("tool.bitefu.net/jiari/", err)
+			config.Isjjr = false
+		}
+		data, err := ioutil.ReadAll(r.Body)
+		if err != nil {
+			log.Println(191, err)
+			config.Isjjr = false
+		}
+		tmp := map[string]interface{}{}
+		json.Unmarshal(data, &tmp)
+		if tmp[nowtime] == 0 || tmp[nowtime] == "0" {
+			config.Isjjr = false
+		}
+		r2, err := http.Get("http://api.goseek.cn/Tools/holiday?date=" + nowtime)
+		defer r2.Body.Close()
+		if err != nil {
+			log.Println("http://api.goseek.cn/Tools/holiday", err)
+			config.Isjjr = false
+		}
+		data2, err := ioutil.ReadAll(r2.Body)
+		if err != nil {
+			log.Println(208, err)
+			config.Isjjr = false
+		}
+		tmp2 := map[string]interface{}{}
+		json.Unmarshal(data2, &tmp2)
+		if tmp2["data"] == 0 || tmp2["data"] == "0" || tmp2["data"] == 2 || tmp2["data"] == "2" {
+			config.Isjjr = false
+		}
+
+	})
+	c.AddFunc("0 */1 * * * *", func() {
+		qu.ReadConfig(&config.Sysconfig)
+		//log.Println(111)
+		//log.Println(config.Sysconfig)
+	})
 
 	log.Println("Http  listening port: ", qu.ObjToString(config.Sysconfig["http_port"]))
 	if err := http.ListenAndServe(":"+qu.ObjToString(config.Sysconfig["http_port"]), mux); err != nil {
@@ -194,9 +274,10 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 		tmp["start"] = tmp[qu.ObjToString(config.SidField)]
 		tmp["import_time"] = time.Now().Unix()
+		tmp["isrun"] = "run"
 		bytes, _ := json.Marshal(tmp)
 		b := mongodb.Save("ocr_task", string(bytes))
-		mongodb.Save("ocr_task_bak", string(bytes))
+		//mongodb.Save("ocr_task_bak", string(bytes))
 		log.Println("保存id:", b)
 	case mu.OP_NOOP: //其他节点回应消息打印
 		log.Println("节点接收成功", string(data), ra.String())
@@ -207,7 +288,9 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			return
 		}
 		config.Sys.Lock()
-		datas := mongodb.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
+		datas := mongodb.Find("ocr_task", bson.M{"isrun":bson.M{
+			"$ne":"stop",
+		}}, `{"_id":1}`, nil, false, -1, -1)
 		if len(*datas) == 0 {
 			go config.Udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
 			config.Sys.Unlock()
@@ -230,19 +313,37 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		if newId.(bson.ObjectId).Hex() >= eid {
 			go config.Udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置
 			go config.Udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)                    //分发任务
-			totmp := make(map[string]string)
-			totmp["gtid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
-			totmp["lteid"] = qu.ObjToString(tmp[qu.ObjToString(config.EidField)])
-			totmp["stype"] = "fujian"
-			tobyte, _ := json.Marshal(totmp)
-			go config.Udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
-				IP:   net.ParseIP(qu.ObjToString(config.Sysconfig["toudpip"])),
-				Port: qu.IntAll(config.Sysconfig["toudpport"]),
-			})
-			log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
-			mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
-			tmp["end_time"] = time.Now().Unix()
-			mongodb.Update("ocr_task_bak", bson.M{"_id": ObjectId.(bson.ObjectId)}, map[string]interface{}{"$set": tmp}, true, false)
+			//if config.IsRun {
+			//	tmpdatas := mongodb.Find("ocr_task", bson.M{"isrun":"stop"}, `{"_id":1}`, nil, false, -1, -1)
+				totmp := make(map[string]string)
+				totmp["stype"] = "fujian"
+				totmp["lteid"] = qu.ObjToString(tmp[qu.ObjToString(config.EidField)])
+				//if tmpdatas == nil || len(*tmpdatas) == 0{
+					totmp["gtid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
+				//}else {
+				//	totmp["gtid"] = qu.ObjToString((*tmpdatas)[0][qu.ObjToString("start")])
+				//	for i,_:=range (*tmpdatas){
+				//		mongodb.Del("ocr_task",bson.M{"_id":(*tmpdatas)[i]["_id"]})
+				//		mongodb.Save("ocr_flie_over",totmp)
+				//	}
+				//}
+				tobyte, _ := json.Marshal(totmp)
+				mongodb.Save("ocr_flie_over",string(tobyte))
+			//go config.Udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
+				//	IP:   net.ParseIP(qu.ObjToString(config.Sysconfig["toudpip"])),
+				//	Port: qu.IntAll(config.Sysconfig["toudpport"]),
+				//})
+				log.Println("ocr_task处理完成,ocr_flie_over存储数据", string(tobyte))
+
+				mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
+				tmp["end_time"] = time.Now().Unix()
+
+				//mongodb.Update("ocr_task_bak", bson.M{"start": ObjectId.(bson.ObjectId)}, map[string]interface{}{"$set": tmp}, false, false)
+			//}else {
+			//	tmp["isrun"] = "stop"
+			//	mongodb.Update("ocr_task",
+			//		bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false)
+			//}
 			config.Sys.Unlock()
 			return
 		}
@@ -262,7 +363,7 @@ func reload(ip string) string {
 	if tmp == nil || len(*tmp) <= 0 {
 		return "ip不存在"
 	}
-	cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例
+	cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"]))                  //删除要重新部署的实例
 	cluster.RunInstances("ocr_task_arr", "8", "false", 1, int(math.Round(hours))) //创建新实例
 	time.Sleep(time.Second * 25)
 	cluster.DescribeInstances() //查询多台实例的详细信息
@@ -291,7 +392,7 @@ func reload(ip string) string {
 				return tmpip + "部署异常," + tmpstr
 			}
 		} else {
-			 tmpip += "部署失败"
+			tmpip += "部署失败"
 		}
 	}
 	return "重新部署" + ip + "未知错误,查询多台实例的详细信息,没有查询到新创建实例"

+ 2 - 1
udp_ocr_conter/src/config/config.go

@@ -13,7 +13,8 @@ var Udpclient mu.UdpClient //udp对象
 var Sysconfig map[string]interface{}
 var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
 var Sys sync.RWMutex
-
+var Isjjr bool//节假日
+//var IsRun bool = true
 
 func init() {
 	util.ReadConfig(&Sysconfig)

+ 55 - 40
udp_ocr_conter/src/corntask/task_corn.go

@@ -15,6 +15,10 @@ import (
 
 var Auto = func() {
 	d := time.Now()
+	if config.Isjjr{
+		log.Println(d.Format("20060102"),"节假日")
+		return
+	}
 	nowday := time.Date(d.Year(), d.Month(), d.Day(), 0, 0, 0, 0, d.Location())
 	cluster.DescribeInstances() //查询多台实例的详细信息
 	ocrescs := mongodb.Find("ocr_ecs", bson.M{}, nil, bson.M{"AutoReleaseTime": 1}, false, -1, -1)
@@ -32,7 +36,9 @@ var Auto = func() {
 			}
 		}
 	}
-	taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
+	taskArr := mongodb.Find("ocr_task", bson.M{"isrun":bson.M{
+		"$ne":"stop",
+	}}, `{_id:1}`, nil, false, -1, -1)
 	taskNum := len(*taskArr)
 	log.Println("当前任务数量:", taskNum)
 	//if taskNum <= 0 {
@@ -44,42 +50,49 @@ var Auto = func() {
 			log.Println("当前实例为空,无需释放", cluster.CID, )
 			return
 		}
-		if ccnum == 0 && len(cluster.CID) > 0 {
-			log.Println("释放所有实例", cluster.CID)
-			for tmpIid, _ := range cluster.CID {
-				ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
-				log.Println("5分钟后释放实例", tmpIid)
-				go func(tmpIid string,ttt *map[string]interface{}) {
-					time.Sleep(time.Minute * 5)
-					config.Udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
-						IP:   net.ParseIP(util.ObjToString((*ttt)["ip_nw"])),
-						Port: util.IntAll(config.Sysconfig["broadcast_port"]),
-					})
-					mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
-					cluster.DeleteInstance(tmpIid)
-					log.Println("5分钟后释放实例完成", tmpIid)
-				}(tmpIid,ttt)
-			}
-		} else {
-			var tmpnum int
-			for k, _ := range cluster.CID {
-				if ccnum >= tmpnum {
-					return
+		zsa := time.Date(d.Year(), d.Month(), d.Day(), util.IntAll(config.Sysconfig["swa"]), 0, 0, 0, d.Location())
+		zsb := time.Date(d.Year(), d.Month(), d.Day(), util.IntAll(config.Sysconfig["swb"]), 0, 0, 0, d.Location())
+
+		xwa := time.Date(d.Year(), d.Month(), d.Day(), util.IntAll(config.Sysconfig["xwa"]), 0, 0, 0, d.Location())
+		xwb := time.Date(d.Year(), d.Month(), d.Day(), util.IntAll(config.Sysconfig["xwb"]), 0, 0, 0, d.Location())
+		if !((zsa.Before(d) && zsb.After(d)) || (xwa.Before(d) && xwb.After(d))){
+			if ccnum == 0 && len(cluster.CID) > 0 {
+				log.Println("释放所有实例", cluster.CID)
+				for tmpIid, _ := range cluster.CID {
+					ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
+					log.Println("5分钟后释放实例", tmpIid)
+					go func(tmpIid string,ttt *map[string]interface{}) {
+						time.Sleep(time.Minute * 5)
+						config.Udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
+							IP:   net.ParseIP(util.ObjToString((*ttt)["ip_nw"])),
+							Port: util.IntAll(config.Sysconfig["broadcast_port"]),
+						})
+						mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
+						cluster.DeleteInstance(tmpIid)
+						log.Println("5分钟后释放实例完成", tmpIid)
+					}(tmpIid,ttt)
+				}
+			} else {
+				var tmpnum int
+				for k, _ := range cluster.CID {
+					if ccnum >= tmpnum {
+						return
+					}
+					tmpIid := k
+					ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
+					log.Println("5分钟后释放实例", tmpIid)
+					go func(tmpIid string,ttt *map[string]interface{}) {
+						time.Sleep(time.Minute * 5)
+						config.Udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
+							IP:   net.ParseIP(util.ObjToString((*ttt)["ip_nw"])),
+							Port: util.IntAll(config.Sysconfig["broadcast_port"]),
+						})
+						mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
+						cluster.DeleteInstance(tmpIid)
+						log.Println("5分钟后释放实例完成", tmpIid)
+					}(tmpIid,ttt)
+					tmpnum--
 				}
-				tmpIid := k
-				ttt := mongodb.FindOne("ocr_ecs", bson.M{"InstanceId": tmpIid})
-				log.Println("5分钟后释放实例", tmpIid)
-				go func(tmpIid string,ttt *map[string]interface{}) {
-					time.Sleep(time.Minute * 5)
-					config.Udpclient.WriteUdp([]byte("5分钟后释放实例"), mu.OP_DELETE_DOWNLOADERCODES, &net.UDPAddr{
-						IP:   net.ParseIP(util.ObjToString((*ttt)["ip_nw"])),
-						Port: util.IntAll(config.Sysconfig["broadcast_port"]),
-					})
-					mongodb.Del("ocr_ecs", bson.M{"InstanceId": tmpIid})
-					cluster.DeleteInstance(tmpIid)
-					log.Println("5分钟后释放实例完成", tmpIid)
-				}(tmpIid,ttt)
-				tmpnum--
 			}
 		}
 		cluster.DescribeInstances()
@@ -179,7 +192,9 @@ func DynamicTask() {
 }
 func compute() int {
 	nowtime := time.Now().Unix()
-	taskArrase := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
+	taskArrase := mongodb.Find("ocr_task", bson.M{"isrun":bson.M{
+		"$ne":"stop",
+	}}, `{_id:1}`, nil, false, -1, -1)
 	if taskArrase == nil || len(*taskArrase) == 0 {
 		log.Println(464, "nil ro len(*taskArrase) == 0")
 		return 0
@@ -216,12 +231,12 @@ func compute() int {
 			log.Println(496, sum, "<=", util.IntAll(config.Sysconfig["accumulated_task_lowlimit"]))
 			return 0
 		}
-		mtmm := float64(overNum) / float64(nowtime-stime) / float64(len(cluster.CID)+3) //每台每秒
+		mtmm := float64(overNum) / float64(nowtime-stime) / float64(len(cluster.CID)+util.IntAll(config.Sysconfig["gdts"])) //每台每秒
 		if mtmm <= 0 {
-			log.Println(501, overNum, int(nowtime-stime), (len(cluster.CID) + 3))
+			log.Println(501, overNum, int(nowtime-stime), (len(cluster.CID) + util.IntAll(config.Sysconfig["gdts"])))
 			return 0
 		}
-		cc := float64(sum)/float64(util.IntAll(config.Sysconfig["corntime_consuming"]))/mtmm - float64(len(cluster.CID)) - 3
+		cc := float64(sum)/float64(util.IntAll(config.Sysconfig["corntime_consuming"]))/mtmm - float64(len(cluster.CID)) - float64(util.IntAll(config.Sysconfig["gdts"]))
 		log.Println("overNum:", overNum, ",hs:", int(nowtime-stime), ",mtms:", mtmm, ",sum:", sum, cc)
 		if cc > util.Float64All(config.Sysconfig["pernum"]) {
 			return util.IntAll(config.Sysconfig["pernum"])

+ 6 - 1
udp_ocr_conter/src/info/taskinfo.go

@@ -13,7 +13,12 @@ import (
 func QueryInfo() map[string]interface{} {
 	now := time.Now()
 	data := make(map[string]interface{})
-	taskArr := mongodb.Find("ocr_task", bson.M{}, `{_id:1}`, nil, false, -1, -1)
+	config.Sys.Lock()
+	//data["isrun"] = config.IsRun
+	config.Sys.Unlock()
+	taskArr := mongodb.Find("ocr_task", bson.M{"isrun":bson.M{
+		"$ne":"stop",
+	}}, `{_id:1}`, nil, false, -1, -1)
 	taskNum := len(*taskArr)
 	if taskNum == 0 {
 		return data

+ 23 - 0
udp_ocr_conter/src/web/index.html

@@ -5,6 +5,8 @@
 ocr_task
 {{/*{{.}}*/}}
 <br>
+当前发送节点状态   {{.isrun}}  <button id="start" onclick="start()">开启</button>   <button id="stop" onclick="stop()" >停止</button>
+<br>
 {{if eq (len .) 0}}
     当前无任务
 {{end}}
@@ -86,4 +88,25 @@ ocr_task
 <input type="button" onclick="location.reload()" value="刷新"></input>
 
 </body>
+<script src="../res/js/jquery.js"></script>
+<script>
+    function start() {
+        console.log(123)
+        $.ajax({
+            url:"/udpkz?do=start",
+            success:function (req) {
+                location.reload(true)
+            }
+        })
+    }
+    function stop() {
+        console.log(345)
+        $.ajax({
+            url:"/udpkz?do=stop",
+            success:function (req) {
+                location.reload(true)
+            }
+        })
+    }
+</script>
 </html>