1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- package udp
- import (
- "data_ai/extract"
- "fmt"
- log "github.com/donnie4w/go-logger/logger"
- qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "time"
- )
- // 监听-获取-分发任务
- func getRepeatTask() {
- for {
- if len(taskList) > 0 && !isGetask {
- getasklock.Lock()
- isGetask = true
- len_list := len(taskList)
- if len_list > 1 {
- first_id := qu.ObjToString(taskList[0]["sid"])
- end_id := qu.ObjToString(taskList[len_list-1]["eid"])
- if first_id != "" && end_id != "" {
- taskList = taskList[len_list:]
- log.Debug("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList)
- extract.ExtractFieldInfo(first_id, end_id, "bidding")
- log.Debug("AI识别数据完成...发送下节点udp...")
- sendNextNode(first_id, end_id)
- } else {
- log.Debug("合并段落~错误~正常取段落~~~")
- mapInfo := taskList[0]
- if mapInfo != nil {
- taskList = taskList[1:]
- log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
- sid := qu.ObjToString(mapInfo["sid"])
- eid := qu.ObjToString(mapInfo["eid"])
- extract.ExtractFieldInfo(sid, eid, "bidding")
- log.Debug("AI识别数据完成...发送下节点udp...")
- sendNextNode(sid, eid)
- } else {
- sendErrMailApi("AI识别接收段落错误", "获取任务段落异常...跳过段落...")
- isGetask = false
- }
- }
- } else {
- mapInfo := taskList[0]
- if mapInfo != nil {
- taskList = taskList[1:]
- log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
- sid := qu.ObjToString(mapInfo["sid"])
- eid := qu.ObjToString(mapInfo["eid"])
- extract.ExtractFieldInfo(sid, eid, "bidding")
- log.Debug("AI识别数据完成...发送下节点udp...")
- sendNextNode(sid, eid)
- } else {
- sendErrMailApi("AI识别获取段落错误", "获取任务段落异常...跳过段落...")
- isGetask = false
- }
- }
- getasklock.Unlock()
- } else {
- time.Sleep(10 * time.Second)
- }
- }
- }
- // 监控~上节点~长时间未响应
- func lastUdpMonitoring() {
- for {
- responselock.Lock()
- if !isGetask && time.Now().Unix()-lastNodeResponse >= 1800 {
- sendErrMailApi("AI识别程序~流程超时~告警", fmt.Sprintf("半小时~没有新段落数据进入流程...请检查..."))
- lastNodeResponse = time.Now().Unix() //重置时间
- }
- responselock.Unlock()
- time.Sleep(600 * time.Second)
- }
- }
- // 监控~下节点
- func nextUdpMonitoring() {
- for {
- udptaskmap.Range(func(k, v interface{}) bool {
- now := time.Now().Unix()
- node, _ := v.(*udpNode)
- if now-node.timestamp > 120 {
- udptaskmap.Delete(k)
- sendErrMailApi("AI识别程序~下节点未响应~警告", fmt.Sprintf("下节点~数据清洗~未及时响应...请检查..."))
- }
- return true
- })
- time.Sleep(10 * time.Second)
- }
- }
|