zhengkun 1 год назад
Родитель
Сommit
d8ab3c8603
5 измененных файлов с 83 добавлено и 58 удалено
  1. 29 53
      src/jy/extract/extractudp.go
  2. 41 0
      src/jy/extract/extractudp_task.go
  3. 5 1
      src/jy/util/util.go
  4. 3 2
      src/main.go
  5. 5 2
      udps/main.go

+ 29 - 53
src/jy/extract/extractudp.go

@@ -9,6 +9,7 @@ import (
 	mu "mfw/util"
 	"net"
 	qu "qfw/util"
+	"strings"
 	"sync"
 	"time"
 
@@ -19,6 +20,7 @@ import (
 var Udpclient mu.UdpClient //udp对象
 var nextNodes []map[string]interface{}
 var IsExtStop bool
+var udplock sync.Mutex
 
 // 新增机器节点
 func ExtractUdpUpdateMachine() {
@@ -75,53 +77,22 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				log.Debug("收到监测......")
 				Udpclient.WriteUdp([]byte("monitor ok"), mu.OP_NOOP, ra)
 			} else {
-				sid, _ := rep["gtid"].(string)
-				eid, _ := rep["lteid"].(string)
-				if sid == "" || eid == "" {
-					log.Debug("err", "sid=", sid, ",eid=", eid)
+				nomal, _ := rep["nomal"].(string)
+				file, _ := rep["file"].(string)
+				fileArr := strings.Split(file, "-")
+				nomalArr := strings.Split(nomal, "-")
+				if len(fileArr) != 2 || len(nomalArr) != 2 || nomal == "" || file == "" {
+					log.Debug("接收upd异常...")
+					go Udpclient.WriteUdp([]byte("接收upd异常..."), mu.OP_NOOP, ra)
 				} else {
-					//新版本控制抽取
-					//udpinfo, _ := rep["stype"].(string)
-					//if udpinfo == "" {
-					//	udpinfo = "udpok"
-					//}
-					//IsExtStop = false
-					//ExtractByUdp(sid, eid, ra)
-					//if !IsExtStop {
-					//	log.Debug("抽取完成udp通知抽取id段-控制台", udpinfo, sid, "~", eid)
-					//	Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
-					//} else {
-					//	log.Debug("抽取强制中断udp不通知-控制台", udpinfo, sid, "~", eid)
-					//}
-
-					//发布数据~重采数据~测试流程
-					//key := sid + "-" + eid + "-" + qu.ObjToString(rep["stype"])
-					//go Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
-					//log.Debug("udp通知抽取id段", sid, " ", eid)
-					//ExtractByUdp(sid, eid, ra)
-					//for _, m := range nextNodes {
-					//	by, _ := json.Marshal(map[string]interface{}{
-					//		"gtid":  sid,
-					//		"lteid": eid,
-					//		"stype": qu.ObjToString(m["stype"]),
-					//	})
-					//	err_udp := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-					//		IP:   net.ParseIP(m["addr"].(string)),
-					//		Port: qu.IntAll(m["port"]),
-					//	})
-					//	if err_udp != nil {
-					//		log.Debug(err_udp)
-					//	}
-					//}
-					//log.Debug("udp通知抽取完成,eid=", eid)
-
-					//预处理模块
-					key := sid + "-" + eid + "-" + qu.ObjToString(rep["stype"])
-					go Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
-					//保存段落合并段落···
-
-					log.Debug("udp通知抽取id段", sid, " ", eid)
-					ExtractByUdpPre(sid, eid, ra)
+					go Udpclient.WriteUdp([]byte("ok"), mu.OP_NOOP, ra)
+					udplock.Lock()
+					ju.TaskUdpList = append(ju.TaskUdpList, map[string]map[string]string{
+						"nomal": {"sid": nomalArr[0], "eid": nomalArr[1]},
+						"file":  {"sid": fileArr[0], "eid": fileArr[1]},
+					})
+					log.Debug("udp收到任务...数量:", len(ju.TaskUdpList))
+					udplock.Unlock()
 				}
 			}
 		}
@@ -326,7 +297,7 @@ func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 	}
 }
 
-func ExtractByUdpPre(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
+func ExtractByUdpPre(n_sid, n_eid string, f_sid, f_eid string) {
 	defer qu.Catch()
 	if ext == nil {
 		ext = nil
@@ -369,12 +340,14 @@ func ExtractByUdpPre(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 	} else {
 		ext.BidTotal = 0
 	}
-	query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
-	count1 := ext.TaskInfo.FDB.Count("zktest_bidding_nomal", query)
-	count2 := ext.TaskInfo.FDB.Count("zktest_bidding_file", query)
+	query1 := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(n_sid), "$lte": bson.ObjectIdHex(n_eid)}}
+	query2 := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(f_sid), "$lte": bson.ObjectIdHex(f_eid)}}
+	count1 := ext.TaskInfo.FDB.Count("bidding_nomal", query1)
+	count2 := ext.TaskInfo.FDB.Count("bidding_file", query2)
 	log.Debug("待抽取数量:", count1+count2)
-	list1, _ := ext.TaskInfo.FDB.Find("zktest_bidding_nomal", query, nil, Fields, false, -1, -1)
-	list2, _ := ext.TaskInfo.FDB.Find("zktest_bidding_file", query, nil, Fields, false, -1, -1)
+	return
+	list1, _ := ext.TaskInfo.FDB.Find("zktest_bidding_nomal", query1, nil, Fields, false, -1, -1)
+	list2, _ := ext.TaskInfo.FDB.Find("zktest_bidding_file", query2, nil, Fields, false, -1, -1)
 	new_list := append(*list1, *list2...)
 	now_time := time.Now().Unix()
 	total := 0
@@ -384,7 +357,10 @@ func ExtractByUdpPre(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
 			log.Debug("cur index :", total, v["_id"])
 		}
 		total++
-		if spidercode[qu.ObjToString(v["spidercode"])] { //开标记录
+		if spidercode[qu.ObjToString(v["spidercode"])] {
+			continue
+		}
+		if qu.ObjToString(v["biddingid"]) != "" {
 			continue
 		}
 		ext.TaskInfo.ProcessPool <- true

+ 41 - 0
src/jy/extract/extractudp_task.go

@@ -0,0 +1,41 @@
+package extract
+
+import (
+	log "github.com/donnie4w/go-logger/logger"
+	ju "jy/util"
+	"sync"
+	"time"
+)
+
+var getasklock sync.Mutex
+
+// 监听-获取-分发抽取任务
+func GetExtractUdpTaskInfo() {
+	for {
+		if len(ju.TaskUdpList) > 0 && !ju.IsGetask {
+			getasklock.Lock()
+			ju.IsGetask = true
+			l := len(ju.TaskUdpList)
+			if l > 1 {
+				s_nomal := ju.TaskUdpList[0]["nomal"]
+				s_file := ju.TaskUdpList[0]["file"]
+				e_nomal := ju.TaskUdpList[l-1]["nomal"]
+				e_file := ju.TaskUdpList[l-1]["file"]
+				n_sid, f_sid := s_nomal["sid"], s_file["sid"]
+				n_eid, f_eid := e_nomal["eid"], e_file["eid"]
+				ju.TaskUdpList = ju.TaskUdpList[l:]
+				log.Debug("合并段落...", n_sid, "~", f_sid, "~", n_eid, "~", f_eid)
+				ExtractByUdpPre(n_sid, n_eid, f_sid, f_eid)
+			} else {
+				nomal := ju.TaskUdpList[0]["nomal"]
+				file := ju.TaskUdpList[0]["file"]
+				ju.TaskUdpList = ju.TaskUdpList[1:]
+				ExtractByUdpPre(nomal["sid"], nomal["eid"], file["sid"], file["eid"])
+			}
+			ju.IsGetask = false
+			getasklock.Unlock()
+		} else {
+			time.Sleep(10 * time.Second)
+		}
+	}
+}

+ 5 - 1
src/jy/util/util.go

@@ -48,6 +48,10 @@ var Site_Mgo, Qyxy_Mgo *MongodbSim
 var IsUpdateRule bool
 var DefaultRegions, AdjustmentRegions = []string{}, []string{}
 
+// 任务池
+var TaskUdpList = []map[string]map[string]string{}
+var IsGetask = false
+
 func init() {
 	syncint = make(chan bool, 1)
 }
@@ -359,7 +363,7 @@ func RemoveDuplicates(input []string) []string {
 	return output
 }
 
-//RemoveDuplicatesAndKeepLonger 去除字符串数组中的重复数据,同时只保留更全的字符串
+// RemoveDuplicatesAndKeepLonger 去除字符串数组中的重复数据,同时只保留更全的字符串
 func RemoveDuplicatesAndKeepLonger(arr []string) []string {
 	result := make([]string, 0)
 	uniqueStrings := make(map[string]struct{})

+ 3 - 2
src/main.go

@@ -27,8 +27,9 @@ func init() {
 	u.UtilInit()
 }
 func main() {
-	extract.ExtractUdpUpdateMachine() //节点上传~构建
-	extract.ExtractUdp()              //udp通知抽取
+	extract.ExtractUdpUpdateMachine()  //节点上传~构建
+	extract.ExtractUdp()               //udp通知抽取
+	go extract.GetExtractUdpTaskInfo() //抽取任务获取
 	go Router.Run(":" + qu.ObjToString(u.Config["port"]))
 	go log.Debug("启动..", qu.ObjToString(u.Config["port"]))
 	go func() {

+ 5 - 2
udps/main.go

@@ -26,6 +26,9 @@ func main() {
 	flag.StringVar(&id1, "gtid", "124ed30f4f7bde5444f1eb84", "gtid")
 	flag.StringVar(&id2, "lteid", "92446f91923488e1724735de", "lteid")
 
+	flag.StringVar(&id1, "nomal", "124ed30f4f7bde5444f1eb84-92446f91923488e1724735de", "nomal")
+	flag.StringVar(&id2, "file", "124ed30f4f7bde5444f1eb84-92446f91923488e1724735de", "file")
+
 	flag.StringVar(&ids, "ids", "", "id1,id2")
 	flag.StringVar(&stype, "stype", "", "stype,传递类型")
 	flag.StringVar(&bkey, "bkey", "", "bkey,加上此参数表示不生关键词和摘要")
@@ -58,10 +61,10 @@ func main() {
 			"stype": stype,
 		}
 		if id1 != "" {
-			m1["gtid"] = id1
+			m1["nomal"] = id1
 		}
 		if id2 != "" {
-			m1["lteid"] = id2
+			m1["file"] = id2
 		}
 		if bkey != "" {
 			m1["bkey"] = bkey