zhengkun 1 mese fa
parent
commit
4cd3627613
4 ha cambiato i file con 72 aggiunte e 75 eliminazioni
  1. 3 3
      udpcontrol/src/initdata.go
  2. 2 2
      udpcontrol/src/main.go
  3. 35 38
      udpcontrol/src/method.go
  4. 32 32
      udpcontrol/src/updprocess.go

+ 3 - 3
udpcontrol/src/initdata.go

@@ -49,7 +49,7 @@ func initVarData() {
 	isGetask = false
 }
 
-//加载抽取
+// 加载抽取
 func initExtractNode() {
 	resetExtNodeArr() //重置抽取节点数组
 	sess := data_mgo.GetMgoConn()
@@ -85,7 +85,7 @@ func initExtractNode() {
 	if len(using_ext_node) <= 0 {
 		sendErrMailApi("抽取控制中心~严重错误", "当前无可用机器")
 	} else if len(using_ext_node) < using_machine { //不足预设-通知
-		sendErrMailApi("抽取控制中心~警告", "当前可用机器不足预设~请检查")
+		//sendErrMailApi("抽取控制中心~警告", "当前可用机器不足预设~请检查")
 	} else {
 
 	}
@@ -93,7 +93,7 @@ func initExtractNode() {
 	log.Debug("综合后节点~有效~备用~无效", len(using_ext_node), len(standby_ext_node), len(invalid_ext_node))
 }
 
-//重置抽取
+// 重置抽取
 func resetExtNodeArr() {
 	isAction = false
 	using_ext_node = []map[string]interface{}{}

+ 2 - 2
udpcontrol/src/main.go

@@ -17,8 +17,8 @@ func main() {
 	//各种监控等
 	go extractRunningMonitoring()
 	go getRepeatTask()
-	go lastUdpMonitoring()
-	go nextUdpMonitoring()
+	//go lastUdpMonitoring()
+	//go nextUdpMonitoring()
 
 	lock := make(chan bool)
 	<-lock

+ 35 - 38
udpcontrol/src/method.go

@@ -1,12 +1,10 @@
 package main
 
 import (
-	"encoding/json"
 	"fmt"
 	log "github.com/donnie4w/go-logger/logger"
 	"net"
 	qu "qfw/util"
-	"strings"
 	"sync"
 	"time"
 )
@@ -77,7 +75,7 @@ func getRepeatTask() {
 func extractRunningMonitoring() {
 	for {
 		if isAction {
-			time_now := time.Now().Unix()
+			//time_now := time.Now().Unix()
 			isErr := false
 			methodlock.Lock()
 			for k, v := range extractAction {
@@ -90,44 +88,43 @@ func extractRunningMonitoring() {
 					continue
 				}
 				//心跳监测~回应
-				keyArr := strings.Split(k, ":")
-				if len(keyArr) == 3 {
-					by, _ := json.Marshal(map[string]interface{}{
-						"stype": "heart_extract",
-						"skey":  "heart_extract" + k,
-					})
-					sendSingleOtherNode(by, keyArr[0], keyArr[1])
-					heartlock.Lock()
-					heart_num := qu.IntAll(heartAction[k])
-					heartAction[k] = heart_num + 1
-					heartlock.Unlock()
-				}
-				life := qu.Int64All(v["life"])
-				if time_now > life || qu.IntAll(heartAction[k]) > 10 {
-					isErr = true //超时~无响应~认为机器异常
-					data_mgo.UpdateById(data_c_name, qu.ObjToString(v["uid"]), map[string]interface{}{
-						"$set": map[string]interface{}{
-							"isuse": 0,
-						},
-					})
-				}
-
+				//keyArr := strings.Split(k, ":")
+				//if len(keyArr) == 3 {
+				//	by, _ := json.Marshal(map[string]interface{}{
+				//		"stype": "heart_extract",
+				//		"skey":  "heart_extract" + k,
+				//	})
+				//	sendSingleOtherNode(by, keyArr[0], keyArr[1])
+				//	heartlock.Lock()
+				//	heart_num := qu.IntAll(heartAction[k])
+				//	heartAction[k] = heart_num + 1
+				//	heartlock.Unlock()
+				//}
+				//life := qu.Int64All(v["life"])
+				//if time_now > life || qu.IntAll(heartAction[k]) > 10 {
+				//	isErr = true //超时~无响应~认为机器异常
+				//	data_mgo.UpdateById(data_c_name, qu.ObjToString(v["uid"]), map[string]interface{}{
+				//		"$set": map[string]interface{}{
+				//			"isuse": 0,
+				//		},
+				//	})
+				//}
 			}
 			methodlock.Unlock()
 			if isErr {
-				sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
-				eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
-				isAction = false
-				sendStopExtractNode(using_ext_node) //停止
-				if len(standby_ext_node) == 0 {
-					sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~无备用机器~此段落需要过滤~%s~%s", sid, eid))
-					time.Sleep(15 * time.Second)
-					sendNextNode(sid, eid)
-				} else {
-					sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~有备用机器~启用备用机器~%s~%s", sid, eid))
-					time.Sleep(15 * time.Second)
-					dealWithExtUdpData(sid, eid)
-				}
+				//sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
+				//eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
+				//isAction = false
+				//sendStopExtractNode(using_ext_node) //停止
+				//if len(standby_ext_node) == 0 {
+				//	sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~无备用机器~此段落需要过滤~%s~%s", sid, eid))
+				//	time.Sleep(15 * time.Second)
+				//	sendNextNode(sid, eid)
+				//} else {
+				//	sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~有备用机器~启用备用机器~%s~%s", sid, eid))
+				//	time.Sleep(15 * time.Second)
+				//	dealWithExtUdpData(sid, eid)
+				//}
 			}
 		}
 		time.Sleep(15 * time.Second)

+ 32 - 32
udpcontrol/src/updprocess.go

@@ -63,7 +63,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		str := string(data)
 		if isAction {
 			if strings.Contains(str, "heart_extract") {
-				dealWithHeartBackUdpData(strings.ReplaceAll(str, "heart_extract", ""))
+				//dealWithHeartBackUdpData(strings.ReplaceAll(str, "heart_extract", ""))
 			} else {
 				dealWithCallBackUdpData(str)
 			}
@@ -75,6 +75,32 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 }
 
+// 处理回调udp~相关数据
+func dealWithCallBackUdpData(str string) {
+	if extractAction[str] != nil {
+		extractAction[str]["action"] = 1
+		log.Debug("抽取节点回应:", str)
+		f := validExtractFinish()
+		if f {
+			sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
+			eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
+			isAction = false
+			lastNodeResponse = time.Now().Unix()
+			sendNextNode(sid, eid)
+		}
+	} else {
+		log.Debug("其他节点回应:", str)
+		udptaskmap.Delete(str)
+	}
+}
+
+// 处理-心跳回调
+func dealWithHeartBackUdpData(str string) {
+	if heartAction[str] != nil {
+		heartAction[str] = 0
+	}
+}
+
 // 处理~新接收抽取段~
 func dealWithExtUdpData(sid, eid string) {
 	//获取最新-抽取节点状态
@@ -105,32 +131,6 @@ func dealWithExtUdpData(sid, eid string) {
 	}
 }
 
-// 处理回调udp~相关数据
-func dealWithCallBackUdpData(str string) {
-	if extractAction[str] != nil {
-		extractAction[str]["action"] = 1
-		log.Debug("抽取节点回应:", str)
-		f := validExtractFinish()
-		if f {
-			sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
-			eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
-			isAction = false
-			lastNodeResponse = time.Now().Unix()
-			sendNextNode(sid, eid)
-		}
-	} else {
-		log.Debug("其他节点回应:", str)
-		udptaskmap.Delete(str)
-	}
-}
-
-// 处理-心跳回调
-func dealWithHeartBackUdpData(str string) {
-	if heartAction[str] != nil {
-		heartAction[str] = 0
-	}
-}
-
 // 通知所有节点~进行抽取~
 func sendRunExtractNode(splitArr []map[string]interface{}) {
 	for index, node := range using_ext_node {
@@ -184,12 +184,12 @@ func sendNextNode(sid string, eid string) {
 		udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
 
 		//只监控清洗流程
-		if qu.IntAll(node["port"]) == 1799 {
-			new_node := &udpNode{by, addr, time.Now().Unix()}
-			udptaskmap.Store(key, new_node)
-		}
+		//if qu.IntAll(node["port"]) == 1799 {
+		//	new_node := &udpNode{by, addr, time.Now().Unix()}
+		//	udptaskmap.Store(key, new_node)
+		//}
 	}
-	log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市", sid, "~", eid)
+	log.Debug("udp通知抽取完成...通知下阶段udp-大模型识别", sid, "~", eid)
 
 	//此段落彻底完毕~继续获取任务
 	isGetask = false