123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- package main
- import (
- "fmt"
- log "github.com/donnie4w/go-logger/logger"
- "net"
- qu "qfw/util"
- "sync"
- "time"
- )
- var methodlock sync.Mutex
- var heartlock sync.Mutex
- var responselock sync.Mutex
- var getasklock sync.Mutex
- // 邮件下节点响应
- var udptaskmap = &sync.Map{}
- type udpNode struct {
- data []byte
- addr *net.UDPAddr
- timestamp int64
- }
- // 监听-获取-分发抽取任务
- func getRepeatTask() {
- for {
- if len(taskList) > 0 && !isGetask {
- getasklock.Lock()
- isGetask = true
- len_list := len(taskList)
- if len_list > 1 {
- first_id := qu.ObjToString(taskList[0]["sid"])
- end_id := qu.ObjToString(taskList[len_list-1]["eid"])
- if first_id != "" && end_id != "" {
- taskList = taskList[len_list:]
- log.Debug("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList)
- dealWithExtUdpData(first_id, end_id)
- } else {
- log.Debug("合并段落~错误~正常取段落~~~")
- mapInfo := taskList[0]
- if mapInfo != nil {
- taskList = taskList[1:]
- log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
- sid := qu.ObjToString(mapInfo["sid"])
- eid := qu.ObjToString(mapInfo["eid"])
- dealWithExtUdpData(sid, eid)
- } else {
- sendErrMailApi("抽取控制中心~任务错误", "获取任务段落异常...跳过段落...")
- isGetask = false
- }
- }
- } else {
- mapInfo := taskList[0]
- if mapInfo != nil {
- taskList = taskList[1:]
- log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
- sid := qu.ObjToString(mapInfo["sid"])
- eid := qu.ObjToString(mapInfo["eid"])
- dealWithExtUdpData(sid, eid)
- } else {
- sendErrMailApi("抽取控制中心~任务错误", "获取任务段落异常...跳过段落...")
- isGetask = false
- }
- }
- getasklock.Unlock()
- } else {
- time.Sleep(10 * time.Second)
- }
- }
- }
- // 监控~当前抽取段~状态 生命周期
- func extractRunningMonitoring() {
- for {
- if isAction {
- //time_now := time.Now().Unix()
- isErr := false
- methodlock.Lock()
- for k, v := range extractAction {
- if k == "extract_ids" {
- continue
- }
- //抽取行为完成~状态
- action := qu.IntAll(v["action"])
- if action == 1 {
- continue
- }
- //心跳监测~回应
- //keyArr := strings.Split(k, ":")
- //if len(keyArr) == 3 {
- // by, _ := json.Marshal(map[string]interface{}{
- // "stype": "heart_extract",
- // "skey": "heart_extract" + k,
- // })
- // sendSingleOtherNode(by, keyArr[0], keyArr[1])
- // heartlock.Lock()
- // heart_num := qu.IntAll(heartAction[k])
- // heartAction[k] = heart_num + 1
- // heartlock.Unlock()
- //}
- //life := qu.Int64All(v["life"])
- //if time_now > life || qu.IntAll(heartAction[k]) > 10 {
- // isErr = true //超时~无响应~认为机器异常
- // data_mgo.UpdateById(data_c_name, qu.ObjToString(v["uid"]), map[string]interface{}{
- // "$set": map[string]interface{}{
- // "isuse": 0,
- // },
- // })
- //}
- }
- methodlock.Unlock()
- if isErr {
- //sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
- //eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
- //isAction = false
- //sendStopExtractNode(using_ext_node) //停止
- //if len(standby_ext_node) == 0 {
- // sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~无备用机器~此段落需要过滤~%s~%s", sid, eid))
- // time.Sleep(15 * time.Second)
- // sendNextNode(sid, eid)
- //} else {
- // sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~有备用机器~启用备用机器~%s~%s", sid, eid))
- // time.Sleep(15 * time.Second)
- // dealWithExtUdpData(sid, eid)
- //}
- }
- }
- time.Sleep(15 * time.Second)
- }
- }
- // 监控~上节点~长时间未响应
- func lastUdpMonitoring() {
- for {
- responselock.Lock()
- if !isAction && time.Now().Unix()-lastNodeResponse >= 1800 {
- sendErrMailApi("抽取控制中心~流程超时~告警", fmt.Sprintf("半小时~没有新段落数据进入流程...请检查..."))
- lastNodeResponse = time.Now().Unix() //重置时间
- }
- responselock.Unlock()
- time.Sleep(600 * time.Second)
- }
- }
- // 监控~下节点
- func nextUdpMonitoring() {
- for {
- udptaskmap.Range(func(k, v interface{}) bool {
- now := time.Now().Unix()
- node, _ := v.(*udpNode)
- if now-node.timestamp > 120 {
- udptaskmap.Delete(k)
- sendErrMailApi("抽取控制中心~下节点未响应~警告", fmt.Sprintf("下节点~大模型识别~未及时响应...请检查..."))
- }
- return true
- })
- time.Sleep(10 * time.Second)
- }
- }
- // 验证抽取是否完毕 不验证-extract_ids~key
- func validExtractFinish() bool {
- for k, v := range extractAction {
- if k == "extract_ids" {
- continue
- }
- if qu.Int64All(v["action"]) == 0 {
- return false
- }
- }
- return true
- }
- // 拆分ID段方法
- func splitIdMethod(sid string, eid string) ([]map[string]interface{}, []int64) {
- dataArr := make([]map[string]interface{}, 0)
- lifeArr := make([]int64, 0)
- if sid == "" || eid == "" || len(using_ext_node) == 0 {
- return dataArr, lifeArr
- }
- sess := source_mgo.GetMgoConn()
- defer source_mgo.DestoryMongoConn(sess)
- q, total := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(sid),
- "$lte": StringTOBsonId(eid),
- },
- }, int64(0)
- count, _ := sess.DB(source_mgo.DbName).C(source_c_name).Find(&q).Count()
- log.Debug("查询当前数量:", count)
- if len(using_ext_node) == 1 {
- dataArr = append(dataArr, map[string]interface{}{
- "sid": sid,
- "eid": eid,
- })
- lifeArr = append(lifeArr, calculateLiftime(count))
- } else {
- node_num := int64(len(using_ext_node))
- if count < node_num { //采用一个节点-多余临时删除
- log.Debug("数量过少~采用一个节点")
- tmp_node := using_ext_node[0]
- using_ext_node = []map[string]interface{}{}
- using_ext_node = append(using_ext_node, tmp_node)
- dataArr = append(dataArr, map[string]interface{}{
- "sid": sid,
- "eid": eid,
- })
- lifeArr = append(lifeArr, calculateLiftime(count))
- } else {
- limit := count / node_num
- limit_lifetime := calculateLiftime(limit)
- tmp_sid := sid
- it := sess.DB(source_mgo.DbName).C(source_c_name).Find(&q).Sort("_id").Select(map[string]interface{}{
- "_id": 1,
- }).Iter()
- for tmp := make(map[string]interface{}); it.Next(&tmp); {
- total++
- if total%limit == 0 {
- if total/limit == node_num {
- dataArr = append(dataArr, map[string]interface{}{
- "sid": tmp_sid,
- "eid": eid,
- })
- lifeArr = append(lifeArr, limit_lifetime)
- break
- } else {
- dataArr = append(dataArr, map[string]interface{}{
- "sid": tmp_sid,
- "eid": BsonTOStringId(tmp["_id"]),
- })
- tmp_sid = BsonTOStringId(tmp["_id"])
- lifeArr = append(lifeArr, limit_lifetime)
- }
- }
- tmp = make(map[string]interface{})
- }
- }
- }
- if len(dataArr) != len(using_ext_node) || len(dataArr) != len(lifeArr) {
- log.Debug("划分段落异常~请检查~只能采用唯一节点~")
- tmp_node := using_ext_node[0]
- using_ext_node = []map[string]interface{}{}
- using_ext_node = append(using_ext_node, tmp_node)
- dataArr = []map[string]interface{}{}
- lifeArr = []int64{}
- dataArr = append(dataArr, map[string]interface{}{
- "sid": sid,
- "eid": eid,
- })
- lifeArr = append(lifeArr, calculateLiftime(count))
- }
- return dataArr, lifeArr
- }
- // 计算生命周期
- func calculateLiftime(count int64) int64 {
- time_one := 1500.0 / 1000.0 //暂定~每千条用时1500秒
- life_time := int64(time_one * float64(count) * 3.0)
- if life_time < 2400 {
- life_time = 2400
- }
- return time.Now().Unix() + life_time
- }
- //暂时弃用
- //func sqlitID(){
- // if len(using_ext_node)==1 {
- // dataArr = append(dataArr, map[string]interface{}{
- // "sid":sid,
- // "eid":eid,
- // })
- //
- // }else {
- // interval := hex2Dec(string(eid[:8]))-hex2Dec(string(sid[:8]))
- // num := interval/int64(len(using_ext_node))
- // tmp_time := hex2Dec(string(sid[:8]))+num
- // for i:=0;i<len(using_ext_node);i++ {
- // if i==0 {
- // tmp_eid := fmt.Sprintf("%x",tmp_time)
- // dataArr = append(dataArr, map[string]interface{}{
- // "sid":sid,
- // "eid":tmp_eid+"0000000000000000",
- // })
- // }else if i==len(using_ext_node)-1 {
- // tmp_sid := fmt.Sprintf("%x",tmp_time)
- // dataArr = append(dataArr, map[string]interface{}{
- // "sid":tmp_sid+"0000000000000000",
- // "eid":eid,
- // })
- // }else {
- // tmp_sid := fmt.Sprintf("%x",tmp_time)
- // tmp_time = tmp_time+num
- // tmp_eid := fmt.Sprintf("%x",tmp_time)
- // dataArr = append(dataArr, map[string]interface{}{
- // "sid":tmp_sid+"0000000000000000",
- // "eid":tmp_eid+"0000000000000000",
- // })
- // }
- // }
- // }
- //}
- //
- //func hex2Dec(val string)int64{
- // n,_ := strconv.ParseInt(val,16,32)
- // return n
- //}
|