package main import ( "fmt" log "github.com/donnie4w/go-logger/logger" "net" qu "qfw/util" "sync" "time" ) var methodlock sync.Mutex var heartlock sync.Mutex var responselock sync.Mutex var getasklock sync.Mutex // 邮件下节点响应 var udptaskmap = &sync.Map{} type udpNode struct { data []byte addr *net.UDPAddr timestamp int64 } // 监听-获取-分发抽取任务 func getRepeatTask() { for { if len(taskList) > 0 && !isGetask { getasklock.Lock() isGetask = true len_list := len(taskList) if len_list > 1 { first_id := qu.ObjToString(taskList[0]["sid"]) end_id := qu.ObjToString(taskList[len_list-1]["eid"]) if first_id != "" && end_id != "" { taskList = taskList[len_list:] log.Debug("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList) dealWithExtUdpData(first_id, end_id) } else { log.Debug("合并段落~错误~正常取段落~~~") mapInfo := taskList[0] if mapInfo != nil { taskList = taskList[1:] log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList) sid := qu.ObjToString(mapInfo["sid"]) eid := qu.ObjToString(mapInfo["eid"]) dealWithExtUdpData(sid, eid) } else { sendErrMailApi("抽取控制中心~任务错误", "获取任务段落异常...跳过段落...") isGetask = false } } } else { mapInfo := taskList[0] if mapInfo != nil { taskList = taskList[1:] log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList) sid := qu.ObjToString(mapInfo["sid"]) eid := qu.ObjToString(mapInfo["eid"]) dealWithExtUdpData(sid, eid) } else { sendErrMailApi("抽取控制中心~任务错误", "获取任务段落异常...跳过段落...") isGetask = false } } getasklock.Unlock() } else { time.Sleep(10 * time.Second) } } } // 监控~当前抽取段~状态 生命周期 func extractRunningMonitoring() { for { if isAction { //time_now := time.Now().Unix() isErr := false methodlock.Lock() for k, v := range extractAction { if k == "extract_ids" { continue } //抽取行为完成~状态 action := qu.IntAll(v["action"]) if action == 1 { continue } //心跳监测~回应 //keyArr := strings.Split(k, ":") //if len(keyArr) == 3 { // by, _ := json.Marshal(map[string]interface{}{ // "stype": "heart_extract", // "skey": "heart_extract" + k, // }) // sendSingleOtherNode(by, keyArr[0], keyArr[1]) // heartlock.Lock() // heart_num := qu.IntAll(heartAction[k]) // heartAction[k] = heart_num + 1 // heartlock.Unlock() //} //life := qu.Int64All(v["life"]) //if time_now > life || qu.IntAll(heartAction[k]) > 10 { // isErr = true //超时~无响应~认为机器异常 // data_mgo.UpdateById(data_c_name, qu.ObjToString(v["uid"]), map[string]interface{}{ // "$set": map[string]interface{}{ // "isuse": 0, // }, // }) //} } methodlock.Unlock() if isErr { //sid := qu.ObjToString(extractAction["extract_ids"]["sid"]) //eid := qu.ObjToString(extractAction["extract_ids"]["eid"]) //isAction = false //sendStopExtractNode(using_ext_node) //停止 //if len(standby_ext_node) == 0 { // sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~无备用机器~此段落需要过滤~%s~%s", sid, eid)) // time.Sleep(15 * time.Second) // sendNextNode(sid, eid) //} else { // sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~有备用机器~启用备用机器~%s~%s", sid, eid)) // time.Sleep(15 * time.Second) // dealWithExtUdpData(sid, eid) //} } } time.Sleep(15 * time.Second) } } // 监控~上节点~长时间未响应 func lastUdpMonitoring() { for { responselock.Lock() if !isAction && time.Now().Unix()-lastNodeResponse >= 1800 { sendErrMailApi("抽取控制中心~流程超时~告警", fmt.Sprintf("半小时~没有新段落数据进入流程...请检查...")) lastNodeResponse = time.Now().Unix() //重置时间 } responselock.Unlock() time.Sleep(600 * time.Second) } } // 监控~下节点 func nextUdpMonitoring() { for { udptaskmap.Range(func(k, v interface{}) bool { now := time.Now().Unix() node, _ := v.(*udpNode) if now-node.timestamp > 120 { udptaskmap.Delete(k) sendErrMailApi("抽取控制中心~下节点未响应~警告", fmt.Sprintf("下节点~大模型识别~未及时响应...请检查...")) } return true }) time.Sleep(10 * time.Second) } } // 验证抽取是否完毕 不验证-extract_ids~key func validExtractFinish() bool { for k, v := range extractAction { if k == "extract_ids" { continue } if qu.Int64All(v["action"]) == 0 { return false } } return true } // 拆分ID段方法 func splitIdMethod(sid string, eid string) ([]map[string]interface{}, []int64) { dataArr := make([]map[string]interface{}, 0) lifeArr := make([]int64, 0) if sid == "" || eid == "" || len(using_ext_node) == 0 { return dataArr, lifeArr } sess := source_mgo.GetMgoConn() defer source_mgo.DestoryMongoConn(sess) q, total := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(sid), "$lte": StringTOBsonId(eid), }, }, int64(0) count, _ := sess.DB(source_mgo.DbName).C(source_c_name).Find(&q).Count() log.Debug("查询当前数量:", count) if len(using_ext_node) == 1 { dataArr = append(dataArr, map[string]interface{}{ "sid": sid, "eid": eid, }) lifeArr = append(lifeArr, calculateLiftime(count)) } else { node_num := int64(len(using_ext_node)) if count < node_num { //采用一个节点-多余临时删除 log.Debug("数量过少~采用一个节点") tmp_node := using_ext_node[0] using_ext_node = []map[string]interface{}{} using_ext_node = append(using_ext_node, tmp_node) dataArr = append(dataArr, map[string]interface{}{ "sid": sid, "eid": eid, }) lifeArr = append(lifeArr, calculateLiftime(count)) } else { limit := count / node_num limit_lifetime := calculateLiftime(limit) tmp_sid := sid it := sess.DB(source_mgo.DbName).C(source_c_name).Find(&q).Sort("_id").Select(map[string]interface{}{ "_id": 1, }).Iter() for tmp := make(map[string]interface{}); it.Next(&tmp); { total++ if total%limit == 0 { if total/limit == node_num { dataArr = append(dataArr, map[string]interface{}{ "sid": tmp_sid, "eid": eid, }) lifeArr = append(lifeArr, limit_lifetime) break } else { dataArr = append(dataArr, map[string]interface{}{ "sid": tmp_sid, "eid": BsonTOStringId(tmp["_id"]), }) tmp_sid = BsonTOStringId(tmp["_id"]) lifeArr = append(lifeArr, limit_lifetime) } } tmp = make(map[string]interface{}) } } } if len(dataArr) != len(using_ext_node) || len(dataArr) != len(lifeArr) { log.Debug("划分段落异常~请检查~只能采用唯一节点~") tmp_node := using_ext_node[0] using_ext_node = []map[string]interface{}{} using_ext_node = append(using_ext_node, tmp_node) dataArr = []map[string]interface{}{} lifeArr = []int64{} dataArr = append(dataArr, map[string]interface{}{ "sid": sid, "eid": eid, }) lifeArr = append(lifeArr, calculateLiftime(count)) } return dataArr, lifeArr } // 计算生命周期 func calculateLiftime(count int64) int64 { time_one := 1500.0 / 1000.0 //暂定~每千条用时1500秒 life_time := int64(time_one * float64(count) * 3.0) if life_time < 2400 { life_time = 2400 } return time.Now().Unix() + life_time } //暂时弃用 //func sqlitID(){ // if len(using_ext_node)==1 { // dataArr = append(dataArr, map[string]interface{}{ // "sid":sid, // "eid":eid, // }) // // }else { // interval := hex2Dec(string(eid[:8]))-hex2Dec(string(sid[:8])) // num := interval/int64(len(using_ext_node)) // tmp_time := hex2Dec(string(sid[:8]))+num // for i:=0;i