fengweiqiang 5 жил өмнө
parent
commit
2b5543d225

+ 1 - 1
udp_ocr_conter/main.go

@@ -96,7 +96,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 		newId := (*rdata)["_id"]
 		if newId.(bson.ObjectId).Hex() >= eid {
-			go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//起始位置
+			go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra)//起始位置
 			go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//分发任务
 			totmp := make(map[string]string)
 			totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])

+ 24 - 11
udpfileocr/main.go

@@ -20,7 +20,7 @@ var udpclient mu.UdpClient //udp对象
 var Sysconfig map[string]interface{}
 var MgoIP, MgoDB, MgoC, MgoFileFiled, GetDataIp, GetDataPort string
 var sys sync.RWMutex
-var ChanA,ChanB = make(chan bool),make(chan bool,1)
+var ChanA, ChanB = make(chan bool), make(chan bool, 1)
 var tmpNUM int32
 
 func init() {
@@ -46,7 +46,8 @@ func main() {
 	log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
 	ticker := time.NewTicker(time.Second * 30)
 	var num int
-	task: for {
+task:
+	for {
 		select {
 		case <-ticker.C:
 			num++
@@ -78,40 +79,49 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			ChanA <- true
 			close(ChanA)
 		}
-		log.Println("data:",string(data))
+		log.Println("data:", string(data), ra.String())
 		sys.Lock()
 		var mapInfo map[string]interface{}
 		err := json.Unmarshal(data, &mapInfo)
 		if err != nil {
-			log.Println("json err :", err, string(data),ra.String())
-			time.Sleep(time.Second * 30)
+			if string(data) == "没有新数据" {
+				time.Sleep(time.Minute * 5)
+			} else {
+				time.Sleep(time.Second * 30)
+			}
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
 			sys.Unlock()
 			return
 		}
-		if qu.ObjToString(mapInfo["permission"])!="ocr_task"{
-			log.Println("数据异常 :", string(data),ra.String())
-			time.Sleep(time.Second * 3)
+		if qu.ObjToString(mapInfo["permission"]) != "ocr_task" {
+			log.Println("数据异常 :", string(data), ra.String())
+			time.Sleep(time.Second * 30)
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
 			sys.Unlock()
 			return
 		}
 		ObjectId := qu.ObjToString(mapInfo["id"])
 		if ObjectId == "" || !bson.IsObjectIdHex(ObjectId) {
-			log.Println("获取数据id错误", mapInfo,ra.String())
-			time.Sleep(time.Second * 3)
+			log.Println("获取数据id错误", mapInfo, ra.String())
+			time.Sleep(time.Second * 30)
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
 			sys.Unlock()
 			return
 		}
 		sys.Unlock()
-		log.Println("获取数据成功:", mapInfo,ra.String())
+		log.Println("获取数据成功:", mapInfo, ra.String())
 		data, _ := mongodbutil.Mgo.FindById(MgoC, ObjectId, bson.M{"_id": 1, MgoFileFiled: 1})
 		if len(*data) == 0 {
+			if qu.ObjToString(mapInfo["is_start"]) == "true"{
+				return
+			}
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
 			return
 		}
 		if v, ok := (*data)[MgoFileFiled].(map[string]interface{}); !ok {
+			if qu.ObjToString(mapInfo["is_start"]) == "true" {
+				return
+			}
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
 			return
 		} else {
@@ -135,6 +145,9 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}
 				}
 			}
+			if qu.ObjToString(mapInfo["is_start"]) == "true" {
+				return
+			}
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
 		}
 	case mu.OP_NOOP: //下个节点回应