package udp import ( "data_ai/extract" "fmt" log "github.com/donnie4w/go-logger/logger" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" "time" ) // 监听-获取-分发任务 func getRepeatTask() { for { if len(taskList) > 0 && !isGetask { getasklock.Lock() isGetask = true len_list := len(taskList) if len_list > 1 { first_id := qu.ObjToString(taskList[0]["sid"]) end_id := qu.ObjToString(taskList[len_list-1]["eid"]) if first_id != "" && end_id != "" { taskList = taskList[len_list:] log.Debug("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList) extract.ExtractFieldInfo(first_id, end_id, "bidding") log.Debug("AI识别数据完成...发送下节点udp...") sendNextNode(first_id, end_id) } else { log.Debug("合并段落~错误~正常取段落~~~") mapInfo := taskList[0] if mapInfo != nil { taskList = taskList[1:] log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList) sid := qu.ObjToString(mapInfo["sid"]) eid := qu.ObjToString(mapInfo["eid"]) extract.ExtractFieldInfo(sid, eid, "bidding") log.Debug("AI识别数据完成...发送下节点udp...") sendNextNode(sid, eid) } else { sendErrMailApi("AI识别接收段落错误", "获取任务段落异常...跳过段落...") isGetask = false } } } else { mapInfo := taskList[0] if mapInfo != nil { taskList = taskList[1:] log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList) sid := qu.ObjToString(mapInfo["sid"]) eid := qu.ObjToString(mapInfo["eid"]) extract.ExtractFieldInfo(sid, eid, "bidding") log.Debug("AI识别数据完成...发送下节点udp...") sendNextNode(sid, eid) } else { sendErrMailApi("AI识别获取段落错误", "获取任务段落异常...跳过段落...") isGetask = false } } getasklock.Unlock() } else { time.Sleep(10 * time.Second) } } } // 监控~上节点~长时间未响应 func lastUdpMonitoring() { for { responselock.Lock() if !isGetask && time.Now().Unix()-lastNodeResponse >= 1800 { sendErrMailApi("AI识别程序~流程超时~告警", fmt.Sprintf("半小时~没有新段落数据进入流程...请检查...")) lastNodeResponse = time.Now().Unix() //重置时间 } responselock.Unlock() time.Sleep(600 * time.Second) } } // 监控~下节点 func nextUdpMonitoring() { for { udptaskmap.Range(func(k, v interface{}) bool { now := time.Now().Unix() node, _ := v.(*udpNode) if now-node.timestamp > 120 { udptaskmap.Delete(k) sendErrMailApi("AI识别程序~下节点未响应~警告", fmt.Sprintf("下节点~数据清洗~未及时响应...请检查...")) } return true }) time.Sleep(10 * time.Second) } }