fengweiqiang 5 жил өмнө
parent
commit
dcf0dfea0c

+ 86 - 35
udp_ocr_conter/main.go

@@ -25,7 +25,7 @@ import (
 
 func init() {
 	cluster.DescribeInstances()
-	log.Println("初始化CID:",cluster.CID)
+	log.Println("初始化CID:", cluster.CID)
 }
 
 func main() {
@@ -35,14 +35,14 @@ func main() {
 		ips := qu.ObjToString(config.Sysconfig["broadcast_ips"])
 		ipsArr := strings.Split(ips, ";")
 		for _, v := range ipsArr {
-			log.Println("通知udp来取数据1:",v,config.Sysconfig["broadcast_port"],config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
+			log.Println("通知udp来取数据1:", v, config.Sysconfig["broadcast_port"], config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
 				IP:   net.ParseIP(v),
 				Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
 			}))
 		}
 	}
-	for _,v:= range cluster.CID{
-		log.Println("通知udp来取数据2:",v,config.Sysconfig["broadcast_port"],config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
+	for _, v := range cluster.CID {
+		log.Println("通知udp来取数据2:", v, config.Sysconfig["broadcast_port"], config.Udpclient.WriteUdp([]byte{}, mu.OP_TYPE_DATA, &net.UDPAddr{
 			IP:   net.ParseIP(v),
 			Port: qu.IntAll(config.Sysconfig["broadcast_port"]),
 		}))
@@ -54,6 +54,43 @@ func main() {
 	mux.HandleFunc("/favicon.ico", func(writer http.ResponseWriter, request *http.Request) {
 		writer.WriteHeader(http.StatusOK)
 	})
+	mux.HandleFunc("/udpkz", func(writer http.ResponseWriter, request *http.Request) {
+		tplogin, err := template.ParseFiles("src/web/login.html")
+		if err != nil {
+			log.Println(err)
+			writer.Write([]byte( "页面找不到了~"))
+			return
+		}
+		cookie, _ := request.Cookie("username")
+		if cookie == nil {
+			http.RedirectHandler("/login", http.StatusUnauthorized)
+			tplogin.Execute(writer, "请先登录")
+			return
+		}
+		if cookie.Value != "0c83f57c786a0b4a39efab23731c7ebc" {
+			http.RedirectHandler("/login", http.StatusUnauthorized)
+			tplogin.Execute(writer, "密钥错误")
+			return
+		}
+		err = request.ParseForm()
+		if err !=nil{
+			writer.WriteHeader(http.StatusBadGateway)
+			writer.Write([]byte(err.Error()))
+			return
+		}
+		data := request.Form.Get("do")
+		//log.Println(data)
+		if data == "start"{
+			config.Sys.Lock()
+			config.IsRun = true
+			config.Sys.Unlock()
+		}else if data =="stop"{
+			config.Sys.Lock()
+			config.IsRun = false
+			config.Sys.Unlock()
+		}
+		writer.WriteHeader(http.StatusOK)
+	})
 	mux.HandleFunc("/login", func(writer http.ResponseWriter, request *http.Request) {
 		tp, err := template.ParseFiles("src/web/login.html")
 		if err != nil {
@@ -178,36 +215,36 @@ func main() {
 	c.AddFunc("0 0 6 1/1 * ?", func() {
 		config.Isjjr = true
 		nowtime := time.Now().Format("20060102")
-		r,err  := http.Get("http://tool.bitefu.net/jiari/?d=" + nowtime + "&back=json")
+		r, err := http.Get("http://tool.bitefu.net/jiari/?d=" + nowtime + "&back=json")
 		defer r.Body.Close()
-		if err != nil{
-			log.Println("tool.bitefu.net/jiari/",err)
+		if err != nil {
+			log.Println("tool.bitefu.net/jiari/", err)
 			config.Isjjr = false
 		}
-		data,err := ioutil.ReadAll(r.Body)
+		data, err := ioutil.ReadAll(r.Body)
 		if err != nil {
-			log.Println(191,err)
+			log.Println(191, err)
 			config.Isjjr = false
 		}
 		tmp := map[string]interface{}{}
-		json.Unmarshal(data,&tmp)
-		if tmp[nowtime] == 0 || tmp[nowtime] =="0"{
+		json.Unmarshal(data, &tmp)
+		if tmp[nowtime] == 0 || tmp[nowtime] == "0" {
 			config.Isjjr = false
 		}
 		r2, err := http.Get("http://api.goseek.cn/Tools/holiday?date=" + nowtime)
 		defer r2.Body.Close()
-		if err != nil{
-			log.Println("http://api.goseek.cn/Tools/holiday",err)
+		if err != nil {
+			log.Println("http://api.goseek.cn/Tools/holiday", err)
 			config.Isjjr = false
 		}
-		data2 ,err := ioutil.ReadAll(r2.Body)
+		data2, err := ioutil.ReadAll(r2.Body)
 		if err != nil {
-			log.Println(208,err)
+			log.Println(208, err)
 			config.Isjjr = false
 		}
 		tmp2 := map[string]interface{}{}
-		json.Unmarshal(data2,&tmp2)
-		if tmp2["data"] == 0 || tmp2["data"] == "0"|| tmp2["data"] == 2 || tmp2["data"] == "2"{
+		json.Unmarshal(data2, &tmp2)
+		if tmp2["data"] == 0 || tmp2["data"] == "0" || tmp2["data"] == 2 || tmp2["data"] == "2" {
 			config.Isjjr = false
 		}
 
@@ -218,8 +255,6 @@ func main() {
 		//log.Println(config.Sysconfig)
 	})
 
-
-
 	log.Println("Http  listening port: ", qu.ObjToString(config.Sysconfig["http_port"]))
 	if err := http.ListenAndServe(":"+qu.ObjToString(config.Sysconfig["http_port"]), mux); err != nil {
 		fmt.Println("start http server faild:", err)
@@ -239,6 +274,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 		tmp["start"] = tmp[qu.ObjToString(config.SidField)]
 		tmp["import_time"] = time.Now().Unix()
+		tmp["isrun"] = "run"
 		bytes, _ := json.Marshal(tmp)
 		b := mongodb.Save("ocr_task", string(bytes))
 		mongodb.Save("ocr_task_bak", string(bytes))
@@ -252,7 +288,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			return
 		}
 		config.Sys.Lock()
-		datas := mongodb.Find("ocr_task", nil, `{"_id":1}`, nil, false, -1, -1)
+		datas := mongodb.Find("ocr_task", bson.M{"isrun":"run"}, `{"_id":1}`, nil, false, -1, -1)
 		if len(*datas) == 0 {
 			go config.Udpclient.WriteUdp([]byte("没有新数据"), mu.OP_TYPE_DATA, ra)
 			config.Sys.Unlock()
@@ -275,19 +311,34 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		if newId.(bson.ObjectId).Hex() >= eid {
 			go config.Udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra) //起始位置
 			go config.Udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)                    //分发任务
-			totmp := make(map[string]string)
-			totmp["gtid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
-			totmp["lteid"] = qu.ObjToString(tmp[qu.ObjToString(config.EidField)])
-			totmp["stype"] = "fujian"
-			tobyte, _ := json.Marshal(totmp)
-			go config.Udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
-				IP:   net.ParseIP(qu.ObjToString(config.Sysconfig["toudpip"])),
-				Port: qu.IntAll(config.Sysconfig["toudpport"]),
-			})
-			log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
-			mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
-			tmp["end_time"] = time.Now().Unix()
-			mongodb.Update("ocr_task_bak", bson.M{"_id": ObjectId.(bson.ObjectId)}, map[string]interface{}{"$set": tmp}, true, false)
+			if config.IsRun {
+				tmpdatas := mongodb.Find("ocr_task", bson.M{"isrun":"stop"}, `{"_id":1}`, nil, false, -1, -1)
+				totmp := make(map[string]string)
+				totmp["stype"] = "fujian"
+				totmp["lteid"] = qu.ObjToString(tmp[qu.ObjToString(config.EidField)])
+				if tmpdatas == nil || len(*tmpdatas) == 0{
+					totmp["gtid"] = qu.ObjToString(tmp[qu.ObjToString("start")])
+				}else {
+					totmp["gtid"] = qu.ObjToString((*tmpdatas)[0][qu.ObjToString("start")])
+					for i,_:=range (*tmpdatas){
+						mongodb.Del("ocr_task",bson.M{"_id":(*tmpdatas)[i]["_id"]})
+					}
+				}
+				tobyte, _ := json.Marshal(totmp)
+				go config.Udpclient.WriteUdp(tobyte, mu.OP_TYPE_DATA, &net.UDPAddr{
+					IP:   net.ParseIP(qu.ObjToString(config.Sysconfig["toudpip"])),
+					Port: qu.IntAll(config.Sysconfig["toudpport"]),
+				})
+				log.Println("ocr_task处理完成,发送下个节点", string(tobyte))
+
+				mongodb.Del("ocr_task", bson.M{"_id": ObjectId.(bson.ObjectId)})
+				tmp["end_time"] = time.Now().Unix()
+				mongodb.Update("ocr_task_bak", bson.M{"_id": ObjectId.(bson.ObjectId)}, map[string]interface{}{"$set": tmp}, false, false)
+			}else {
+				tmp["isrun"] = "stop"
+				mongodb.Update("ocr_task",
+					bson.M{"_id": ObjectId.(bson.ObjectId)}, tmp, false, false)
+			}
 			config.Sys.Unlock()
 			return
 		}
@@ -307,7 +358,7 @@ func reload(ip string) string {
 	if tmp == nil || len(*tmp) <= 0 {
 		return "ip不存在"
 	}
-	cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"])) //删除要重新部署的实例
+	cluster.DeleteInstance(qu.ObjToString((*tmp)["InstanceId"]))                  //删除要重新部署的实例
 	cluster.RunInstances("ocr_task_arr", "8", "false", 1, int(math.Round(hours))) //创建新实例
 	time.Sleep(time.Second * 25)
 	cluster.DescribeInstances() //查询多台实例的详细信息
@@ -336,7 +387,7 @@ func reload(ip string) string {
 				return tmpip + "部署异常," + tmpstr
 			}
 		} else {
-			 tmpip += "部署失败"
+			tmpip += "部署失败"
 		}
 	}
 	return "重新部署" + ip + "未知错误,查询多台实例的详细信息,没有查询到新创建实例"

+ 1 - 0
udp_ocr_conter/src/config/config.go

@@ -14,6 +14,7 @@ var Sysconfig map[string]interface{}
 var MgoIP, MgoDB, MgoC, MgoFileFiled, SidField, EidField string
 var Sys sync.RWMutex
 var Isjjr bool//节假日
+var IsRun bool = true
 
 func init() {
 	util.ReadConfig(&Sysconfig)

+ 1 - 1
udp_ocr_conter/src/corntask/task_corn.go

@@ -53,7 +53,7 @@ var Auto = func() {
 
 		xwa := time.Date(d.Year(), d.Month(), d.Day(), util.IntAll(config.Sysconfig["xwa"]), 0, 0, 0, d.Location())
 		xwb := time.Date(d.Year(), d.Month(), d.Day(), util.IntAll(config.Sysconfig["xwb"]), 0, 0, 0, d.Location())
-		if (zsa.Before(d) && zsb.After(d)) || (xwa.Before(d) && xwb.After(d)){
+		if !((zsa.Before(d) && zsb.After(d)) || (xwa.Before(d) && xwb.After(d))){
 			if ccnum == 0 && len(cluster.CID) > 0 {
 				log.Println("释放所有实例", cluster.CID)
 				for tmpIid, _ := range cluster.CID {

+ 3 - 0
udp_ocr_conter/src/info/taskinfo.go

@@ -66,5 +66,8 @@ func QueryInfo() map[string]interface{} {
 	data["esc"] = fmt.Sprint(3 ,"+", len(cluster.CID))
 	data["resultInfos"] = resultInfos
 	//log.Println(resultInfos)
+	config.Sys.Lock()
+	data["isrun"] = config.IsRun
+	config.Sys.Unlock()
 	return data
 }

+ 23 - 0
udp_ocr_conter/src/web/index.html

@@ -5,6 +5,8 @@
 ocr_task
 {{/*{{.}}*/}}
 <br>
+当前发送节点状态   {{.isrun}}  <button id="start" onclick="start()">开启</button>   <button id="stop" onclick="stop()" >停止</button>
+<br>
 {{if eq (len .) 0}}
     当前无任务
 {{end}}
@@ -86,4 +88,25 @@ ocr_task
 <input type="button" onclick="location.reload()" value="刷新"></input>
 
 </body>
+<script src="../res/js/jquery.js"></script>
+<script>
+    function start() {
+        console.log(123)
+        $.ajax({
+            url:"/udpkz?do=start",
+            success:function (req) {
+                location.reload(true)
+            }
+        })
+    }
+    function stop() {
+        console.log(345)
+        $.ajax({
+            url:"/udpkz?do=stop",
+            success:function (req) {
+                location.reload(true)
+            }
+        })
+    }
+</script>
 </html>