Jelajahi Sumber

优化,主线程睡眠,影响udp接收

zhengkun 1 bulan lalu
induk
melakukan
6467b77a3e
4 mengubah file dengan 38 tambahan dan 214 penghapusan
  1. 0 3
      udpcontrol/src/initdata.go
  2. 1 4
      udpcontrol/src/main.go
  3. 5 165
      udpcontrol/src/method.go
  4. 32 42
      udpcontrol/src/updprocess.go

+ 0 - 3
udpcontrol/src/initdata.go

@@ -3,7 +3,6 @@ package main
 import (
 	log "github.com/donnie4w/go-logger/logger"
 	qu "qfw/util"
-	"time"
 )
 
 var (
@@ -11,7 +10,6 @@ var (
 	data_mgo, source_mgo       *MongodbSim
 	data_c_name, source_c_name string
 	using_machine              int
-	lastNodeResponse           int64
 	taskList                   []map[string]interface{}
 )
 
@@ -45,7 +43,6 @@ func initVarData() {
 	initMgo()
 	using_machine = qu.IntAll(sysconfig["using_machine"])
 	nextNode = qu.ObjArrToMapArr(sysconfig["nextNode"].([]interface{}))
-	lastNodeResponse = time.Now().Unix()
 	isGetask = false
 }
 

+ 1 - 4
udpcontrol/src/main.go

@@ -14,11 +14,8 @@ func init() {
 }
 
 func main() {
-	//各种监控等
-	go extractRunningMonitoring()
+
 	go getRepeatTask()
-	//go lastUdpMonitoring()
-	//go nextUdpMonitoring()
 
 	lock := make(chan bool)
 	<-lock

+ 5 - 165
udpcontrol/src/method.go

@@ -1,33 +1,17 @@
 package main
 
 import (
-	"fmt"
 	log "github.com/donnie4w/go-logger/logger"
-	"net"
 	qu "qfw/util"
-	"sync"
 	"time"
 )
 
-var methodlock sync.Mutex
-var heartlock sync.Mutex
-var responselock sync.Mutex
-var getasklock sync.Mutex
-
-// 邮件下节点响应
-var udptaskmap = &sync.Map{}
-
-type udpNode struct {
-	data      []byte
-	addr      *net.UDPAddr
-	timestamp int64
-}
-
 // 监听-获取-分发抽取任务
 func getRepeatTask() {
 	for {
+		getasklock.Lock()
 		if len(taskList) > 0 && !isGetask {
-			getasklock.Lock()
+			log.Debug("进行新的抽取任务···")
 			isGetask = true
 			len_list := len(taskList)
 			if len_list > 1 {
@@ -64,115 +48,14 @@ func getRepeatTask() {
 					isGetask = false
 				}
 			}
-			getasklock.Unlock()
 		} else {
-			time.Sleep(10 * time.Second)
-		}
-	}
-}
-
-// 监控~当前抽取段~状态  生命周期
-func extractRunningMonitoring() {
-	for {
-		if isAction {
-			//time_now := time.Now().Unix()
-			isErr := false
-			methodlock.Lock()
-			for k, v := range extractAction {
-				if k == "extract_ids" {
-					continue
-				}
-				//抽取行为完成~状态
-				action := qu.IntAll(v["action"])
-				if action == 1 {
-					continue
-				}
-				//心跳监测~回应
-				//keyArr := strings.Split(k, ":")
-				//if len(keyArr) == 3 {
-				//	by, _ := json.Marshal(map[string]interface{}{
-				//		"stype": "heart_extract",
-				//		"skey":  "heart_extract" + k,
-				//	})
-				//	sendSingleOtherNode(by, keyArr[0], keyArr[1])
-				//	heartlock.Lock()
-				//	heart_num := qu.IntAll(heartAction[k])
-				//	heartAction[k] = heart_num + 1
-				//	heartlock.Unlock()
-				//}
-				//life := qu.Int64All(v["life"])
-				//if time_now > life || qu.IntAll(heartAction[k]) > 10 {
-				//	isErr = true //超时~无响应~认为机器异常
-				//	data_mgo.UpdateById(data_c_name, qu.ObjToString(v["uid"]), map[string]interface{}{
-				//		"$set": map[string]interface{}{
-				//			"isuse": 0,
-				//		},
-				//	})
-				//}
-			}
-			methodlock.Unlock()
-			if isErr {
-				//sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
-				//eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
-				//isAction = false
-				//sendStopExtractNode(using_ext_node) //停止
-				//if len(standby_ext_node) == 0 {
-				//	sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~无备用机器~此段落需要过滤~%s~%s", sid, eid))
-				//	time.Sleep(15 * time.Second)
-				//	sendNextNode(sid, eid)
-				//} else {
-				//	sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~有备用机器~启用备用机器~%s~%s", sid, eid))
-				//	time.Sleep(15 * time.Second)
-				//	dealWithExtUdpData(sid, eid)
-				//}
-			}
-		}
-		time.Sleep(15 * time.Second)
-	}
-}
-
-// 监控~上节点~长时间未响应
-func lastUdpMonitoring() {
-	for {
-		responselock.Lock()
-		if !isAction && time.Now().Unix()-lastNodeResponse >= 1800 {
-			sendErrMailApi("抽取控制中心~流程超时~告警", fmt.Sprintf("半小时~没有新段落数据进入流程...请检查..."))
-			lastNodeResponse = time.Now().Unix() //重置时间
+			log.Debug("······睡眠15秒······")
+			time.Sleep(15 * time.Second)
 		}
-		responselock.Unlock()
-		time.Sleep(600 * time.Second)
+		getasklock.Unlock()
 	}
 }
 
-// 监控~下节点
-func nextUdpMonitoring() {
-	for {
-		udptaskmap.Range(func(k, v interface{}) bool {
-			now := time.Now().Unix()
-			node, _ := v.(*udpNode)
-			if now-node.timestamp > 120 {
-				udptaskmap.Delete(k)
-				sendErrMailApi("抽取控制中心~下节点未响应~警告", fmt.Sprintf("下节点~大模型识别~未及时响应...请检查..."))
-			}
-			return true
-		})
-		time.Sleep(10 * time.Second)
-	}
-}
-
-// 验证抽取是否完毕	不验证-extract_ids~key
-func validExtractFinish() bool {
-	for k, v := range extractAction {
-		if k == "extract_ids" {
-			continue
-		}
-		if qu.Int64All(v["action"]) == 0 {
-			return false
-		}
-	}
-	return true
-}
-
 // 拆分ID段方法
 func splitIdMethod(sid string, eid string) ([]map[string]interface{}, []int64) {
 	dataArr := make([]map[string]interface{}, 0)
@@ -264,46 +147,3 @@ func calculateLiftime(count int64) int64 {
 	}
 	return time.Now().Unix() + life_time
 }
-
-//暂时弃用
-//func sqlitID(){
-//	if len(using_ext_node)==1 {
-//		dataArr = append(dataArr, map[string]interface{}{
-//			"sid":sid,
-//			"eid":eid,
-//		})
-//
-//	}else {
-//		interval := hex2Dec(string(eid[:8]))-hex2Dec(string(sid[:8]))
-//		num := interval/int64(len(using_ext_node))
-//		tmp_time :=  hex2Dec(string(sid[:8]))+num
-//		for  i:=0;i<len(using_ext_node);i++ {
-//			if i==0 {
-//				tmp_eid := fmt.Sprintf("%x",tmp_time)
-//				dataArr = append(dataArr, map[string]interface{}{
-//					"sid":sid,
-//					"eid":tmp_eid+"0000000000000000",
-//				})
-//			}else if i==len(using_ext_node)-1 {
-//				tmp_sid := fmt.Sprintf("%x",tmp_time)
-//				dataArr = append(dataArr, map[string]interface{}{
-//					"sid":tmp_sid+"0000000000000000",
-//					"eid":eid,
-//				})
-//			}else {
-//				tmp_sid := fmt.Sprintf("%x",tmp_time)
-//				tmp_time = tmp_time+num
-//				tmp_eid := fmt.Sprintf("%x",tmp_time)
-//				dataArr = append(dataArr, map[string]interface{}{
-//					"sid":tmp_sid+"0000000000000000",
-//					"eid":tmp_eid+"0000000000000000",
-//				})
-//			}
-//		}
-//	}
-//}
-//
-//func hex2Dec(val string)int64{
-//	n,_ := strconv.ParseInt(val,16,32)
-//	return n
-//}

+ 32 - 42
udpcontrol/src/updprocess.go

@@ -7,7 +7,6 @@ import (
 	mu "mfw/util"
 	"net"
 	qu "qfw/util"
-	"strings"
 	"sync"
 	"time"
 )
@@ -17,6 +16,8 @@ var (
 	udpclient                                          mu.UdpClient
 	udplock                                            sync.Mutex
 	nextlock                                           sync.Mutex
+	actionlock                                         sync.Mutex
+	getasklock                                         sync.Mutex
 	extractAction                                      map[string]map[string]interface{}
 	heartAction                                        map[string]interface{}
 	isAction                                           bool
@@ -44,9 +45,8 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			if sid == "" || eid == "" {
 				log.Debug("接收id段异常-err ", "sid=", sid, ",eid=", eid)
 			} else {
-				lastNodeResponse = time.Now().Unix()
 				key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
-				go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
 				//插入任务
 				udplock.Lock()
 				taskList = append(taskList, map[string]interface{}{
@@ -58,46 +58,46 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			}
 		}
 	case mu.OP_NOOP: //下个节点回应
-		//抽取多节点
+		// 只锁必要部分
 		nextlock.Lock()
 		str := string(data)
-		if isAction {
-			if strings.Contains(str, "heart_extract") {
-				//dealWithHeartBackUdpData(strings.ReplaceAll(str, "heart_extract", ""))
-			} else {
-				dealWithCallBackUdpData(str)
-			}
+		needAction := isAction
+		nextlock.Unlock()
+
+		if needAction {
+			dealWithCallBackUdpData(str)
 		} else {
-			log.Debug("其他节点回应:", str)
-			udptaskmap.Delete(str)
+			log.Debug("其它节点回应:", str)
 		}
-		nextlock.Unlock()
 	}
 }
 
 // 处理回调udp~相关数据
 func dealWithCallBackUdpData(str string) {
-	if extractAction[str] != nil {
-		extractAction[str]["action"] = 1
-		log.Debug("抽取节点回应:", str)
-		f := validExtractFinish()
-		if f {
-			sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
-			eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
-			isAction = false
-			lastNodeResponse = time.Now().Unix()
-			sendNextNode(sid, eid)
-		}
-	} else {
+	actionlock.Lock()
+	defer actionlock.Unlock()
+	if extractAction[str] == nil {
 		log.Debug("其他节点回应:", str)
-		udptaskmap.Delete(str)
+		return
 	}
-}
-
-// 处理-心跳回调
-func dealWithHeartBackUdpData(str string) {
-	if heartAction[str] != nil {
-		heartAction[str] = 0
+	extractAction[str]["action"] = 1
+	log.Debug("抽取节点回应:", str)
+	f := true
+	for k, v := range extractAction {
+		if k == "extract_ids" {
+			continue
+		}
+		if qu.Int64All(v["action"]) == 0 {
+			f = false
+			break
+		}
+	}
+	if f {
+		sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
+		eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
+		sendNextNode(sid, eid)
+		isAction = false
+		isGetask = false
 	}
 }
 
@@ -182,18 +182,8 @@ func sendNextNode(sid string, eid string) {
 			Port: qu.IntAll(node["port"]),
 		}
 		udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
-
-		//只监控清洗流程
-		//if qu.IntAll(node["port"]) == 1799 {
-		//	new_node := &udpNode{by, addr, time.Now().Unix()}
-		//	udptaskmap.Store(key, new_node)
-		//}
 	}
 	log.Debug("udp通知抽取完成...通知下阶段udp-大模型识别", sid, "~", eid)
-
-	//此段落彻底完毕~继续获取任务
-	isGetask = false
-
 }
 
 // 发送单节点~