瀏覽代碼

Merge branch 'dev3.2' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.2

maxiaoshan 6 年之前
父節點
當前提交
0fe8a3762e
共有 3 個文件被更改,包括 41 次插入22 次删除
  1. 5 2
      udp_ocr_conter/config.json
  2. 11 1
      udp_ocr_conter/main.go
  3. 25 19
      udpfileocr/main.go

+ 5 - 2
udp_ocr_conter/config.json

@@ -1,5 +1,5 @@
 {
-  "udpip": "127.0.0.1",
+  "udpip": "192.168.20.238",
   "udpport": "1990",
   "dbsize": "5",
   "mongodb_ip": "192.168.3.207:27081",
@@ -9,5 +9,8 @@
   "json_sidfiled": "gtid",
   "json_eidfiled": "lteid",
   "toudpip": "127.0.0.1",
-  "toudpport": "1481"
+  "toudpport": "1481",
+  "broadcast": false,
+  "broadcast_ips": "127.0.0.1;192.168.1.2;192.168.1.3;192.168.1.4",
+  "broadcast_port": 1490
 }

+ 11 - 1
udp_ocr_conter/main.go

@@ -38,6 +38,16 @@ func main() {
 	udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
+	if Sysconfig["broadcast"].(bool){//重启的话通知分布式节点
+		ips := qu.ObjToString(Sysconfig["broadcast_ips"])
+		ipsArr := strings.Split(ips, ";")
+		for _,v := range ipsArr{
+			udpclient.WriteUdp([]byte{},mu.OP_NOOP,&net.UDPAddr{
+				IP:   net.ParseIP(v),
+				Port: qu.IntAll(Sysconfig["broadcast_port"]),
+			})
+		}
+	}
 	b := make(chan bool, 1)
 	<-b
 }
@@ -86,7 +96,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 		newId := (*rdata)["_id"]
 		if newId.(bson.ObjectId).Hex() >= eid {
-			go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//起始位置
+			go udpclient.WriteUdp([]byte(`{"id":"`+qu.ObjToString(tmp["start"])+`","permission":"ocr_task","is_start":"true"}`), mu.OP_TYPE_DATA, ra)//起始位置
 			go udpclient.WriteUdp([]byte(`{"id":"`+newId.(bson.ObjectId).Hex()+`","permission":"ocr_task"}`), mu.OP_TYPE_DATA, ra)//分发任务
 			totmp := make(map[string]string)
 			totmp["sid"] = qu.ObjToString(tmp[qu.ObjToString("start")])

+ 25 - 19
udpfileocr/main.go

@@ -11,7 +11,6 @@ import (
 	"path"
 	qu "qfw/util"
 	"strings"
-	"sync"
 	"sync/atomic"
 	"time"
 )
@@ -19,8 +18,7 @@ import (
 var udpclient mu.UdpClient //udp对象
 var Sysconfig map[string]interface{}
 var MgoIP, MgoDB, MgoC, MgoFileFiled, GetDataIp, GetDataPort string
-var sys sync.RWMutex
-var ChanA,ChanB = make(chan bool),make(chan bool,1)
+var ChanA, ChanB = make(chan bool), make(chan bool, 1)
 var tmpNUM int32
 
 func init() {
@@ -46,7 +44,8 @@ func main() {
 	log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
 	ticker := time.NewTicker(time.Second * 30)
 	var num int
-	task: for {
+task:
+	for {
 		select {
 		case <-ticker.C:
 			num++
@@ -78,40 +77,44 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			ChanA <- true
 			close(ChanA)
 		}
-		log.Println("data:",string(data))
-		sys.Lock()
+		log.Println("data:", string(data), ra.String())
 		var mapInfo map[string]interface{}
 		err := json.Unmarshal(data, &mapInfo)
 		if err != nil {
-			log.Println("json err :", err, string(data),ra.String())
-			time.Sleep(time.Second * 30)
+			if string(data) == "没有新数据" {
+				time.Sleep(time.Minute * 5)
+			} else {
+				time.Sleep(time.Second * 30)
+			}
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
-			sys.Unlock()
 			return
 		}
-		if qu.ObjToString(mapInfo["permission"])!="ocr_task"{
-			log.Println("数据异常 :", string(data),ra.String())
-			time.Sleep(time.Second * 3)
+		if qu.ObjToString(mapInfo["permission"]) != "ocr_task" {
+			log.Println("数据异常 :", string(data), ra.String())
+			time.Sleep(time.Second * 30)
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
-			sys.Unlock()
 			return
 		}
 		ObjectId := qu.ObjToString(mapInfo["id"])
 		if ObjectId == "" || !bson.IsObjectIdHex(ObjectId) {
-			log.Println("获取数据id错误", mapInfo,ra.String())
-			time.Sleep(time.Second * 3)
+			log.Println("获取数据id错误", mapInfo, ra.String())
+			time.Sleep(time.Second * 30)
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
-			sys.Unlock()
 			return
 		}
-		sys.Unlock()
-		log.Println("获取数据成功:", mapInfo,ra.String())
+		log.Println("获取数据成功:", mapInfo, ra.String())
 		data, _ := mongodbutil.Mgo.FindById(MgoC, ObjectId, bson.M{"_id": 1, MgoFileFiled: 1})
 		if len(*data) == 0 {
+			if qu.ObjToString(mapInfo["is_start"]) == "true"{
+				return
+			}
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
 			return
 		}
 		if v, ok := (*data)[MgoFileFiled].(map[string]interface{}); !ok {
+			if qu.ObjToString(mapInfo["is_start"]) == "true" {
+				return
+			}
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
 			return
 		} else {
@@ -130,11 +133,14 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 							<-ChanB
 							continue
 						}
-						save(bson.ObjectIdHex(ObjectId), attk, &v, &fileinfo, &updateNum)
+						save(bson.ObjectIdHex(ObjectId), attk, data, &fileinfo, &updateNum)
 						<-ChanB
 					}
 				}
 			}
+			if qu.ObjToString(mapInfo["is_start"]) == "true" {
+				return
+			}
 			go udpclient.WriteUdp([]byte(`{"permission":"get_ocr_task"}`), mu.OP_GET_DOWNLOADERCODE, ra)
 		}
 	case mu.OP_NOOP: //下个节点回应