method.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. package main
  2. import (
  3. "fmt"
  4. log "github.com/donnie4w/go-logger/logger"
  5. "net"
  6. qu "qfw/util"
  7. "sync"
  8. "time"
  9. )
  10. var methodlock sync.Mutex
  11. var heartlock sync.Mutex
  12. var responselock sync.Mutex
  13. var getasklock sync.Mutex
  14. // 邮件下节点响应
  15. var udptaskmap = &sync.Map{}
  16. type udpNode struct {
  17. data []byte
  18. addr *net.UDPAddr
  19. timestamp int64
  20. }
  21. // 监听-获取-分发抽取任务
  22. func getRepeatTask() {
  23. for {
  24. if len(taskList) > 0 && !isGetask {
  25. getasklock.Lock()
  26. isGetask = true
  27. len_list := len(taskList)
  28. if len_list > 1 {
  29. first_id := qu.ObjToString(taskList[0]["sid"])
  30. end_id := qu.ObjToString(taskList[len_list-1]["eid"])
  31. if first_id != "" && end_id != "" {
  32. taskList = taskList[len_list:]
  33. log.Debug("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList)
  34. dealWithExtUdpData(first_id, end_id)
  35. } else {
  36. log.Debug("合并段落~错误~正常取段落~~~")
  37. mapInfo := taskList[0]
  38. if mapInfo != nil {
  39. taskList = taskList[1:]
  40. log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
  41. sid := qu.ObjToString(mapInfo["sid"])
  42. eid := qu.ObjToString(mapInfo["eid"])
  43. dealWithExtUdpData(sid, eid)
  44. } else {
  45. sendErrMailApi("抽取控制中心~任务错误", "获取任务段落异常...跳过段落...")
  46. isGetask = false
  47. }
  48. }
  49. } else {
  50. mapInfo := taskList[0]
  51. if mapInfo != nil {
  52. taskList = taskList[1:]
  53. log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
  54. sid := qu.ObjToString(mapInfo["sid"])
  55. eid := qu.ObjToString(mapInfo["eid"])
  56. dealWithExtUdpData(sid, eid)
  57. } else {
  58. sendErrMailApi("抽取控制中心~任务错误", "获取任务段落异常...跳过段落...")
  59. isGetask = false
  60. }
  61. }
  62. getasklock.Unlock()
  63. } else {
  64. time.Sleep(10 * time.Second)
  65. }
  66. }
  67. }
  68. // 监控~当前抽取段~状态 生命周期
  69. func extractRunningMonitoring() {
  70. for {
  71. if isAction {
  72. //time_now := time.Now().Unix()
  73. isErr := false
  74. methodlock.Lock()
  75. for k, v := range extractAction {
  76. if k == "extract_ids" {
  77. continue
  78. }
  79. //抽取行为完成~状态
  80. action := qu.IntAll(v["action"])
  81. if action == 1 {
  82. continue
  83. }
  84. //心跳监测~回应
  85. //keyArr := strings.Split(k, ":")
  86. //if len(keyArr) == 3 {
  87. // by, _ := json.Marshal(map[string]interface{}{
  88. // "stype": "heart_extract",
  89. // "skey": "heart_extract" + k,
  90. // })
  91. // sendSingleOtherNode(by, keyArr[0], keyArr[1])
  92. // heartlock.Lock()
  93. // heart_num := qu.IntAll(heartAction[k])
  94. // heartAction[k] = heart_num + 1
  95. // heartlock.Unlock()
  96. //}
  97. //life := qu.Int64All(v["life"])
  98. //if time_now > life || qu.IntAll(heartAction[k]) > 10 {
  99. // isErr = true //超时~无响应~认为机器异常
  100. // data_mgo.UpdateById(data_c_name, qu.ObjToString(v["uid"]), map[string]interface{}{
  101. // "$set": map[string]interface{}{
  102. // "isuse": 0,
  103. // },
  104. // })
  105. //}
  106. }
  107. methodlock.Unlock()
  108. if isErr {
  109. //sid := qu.ObjToString(extractAction["extract_ids"]["sid"])
  110. //eid := qu.ObjToString(extractAction["extract_ids"]["eid"])
  111. //isAction = false
  112. //sendStopExtractNode(using_ext_node) //停止
  113. //if len(standby_ext_node) == 0 {
  114. // sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~无备用机器~此段落需要过滤~%s~%s", sid, eid))
  115. // time.Sleep(15 * time.Second)
  116. // sendNextNode(sid, eid)
  117. //} else {
  118. // sendErrMailApi("抽取控制中心~异常", fmt.Sprintf("机器异常~有备用机器~启用备用机器~%s~%s", sid, eid))
  119. // time.Sleep(15 * time.Second)
  120. // dealWithExtUdpData(sid, eid)
  121. //}
  122. }
  123. }
  124. time.Sleep(15 * time.Second)
  125. }
  126. }
  127. // 监控~上节点~长时间未响应
  128. func lastUdpMonitoring() {
  129. for {
  130. responselock.Lock()
  131. if !isAction && time.Now().Unix()-lastNodeResponse >= 1800 {
  132. sendErrMailApi("抽取控制中心~流程超时~告警", fmt.Sprintf("半小时~没有新段落数据进入流程...请检查..."))
  133. lastNodeResponse = time.Now().Unix() //重置时间
  134. }
  135. responselock.Unlock()
  136. time.Sleep(600 * time.Second)
  137. }
  138. }
  139. // 监控~下节点
  140. func nextUdpMonitoring() {
  141. for {
  142. udptaskmap.Range(func(k, v interface{}) bool {
  143. now := time.Now().Unix()
  144. node, _ := v.(*udpNode)
  145. if now-node.timestamp > 120 {
  146. udptaskmap.Delete(k)
  147. sendErrMailApi("抽取控制中心~下节点未响应~警告", fmt.Sprintf("下节点~大模型识别~未及时响应...请检查..."))
  148. }
  149. return true
  150. })
  151. time.Sleep(10 * time.Second)
  152. }
  153. }
  154. // 验证抽取是否完毕 不验证-extract_ids~key
  155. func validExtractFinish() bool {
  156. for k, v := range extractAction {
  157. if k == "extract_ids" {
  158. continue
  159. }
  160. if qu.Int64All(v["action"]) == 0 {
  161. return false
  162. }
  163. }
  164. return true
  165. }
  166. // 拆分ID段方法
  167. func splitIdMethod(sid string, eid string) ([]map[string]interface{}, []int64) {
  168. dataArr := make([]map[string]interface{}, 0)
  169. lifeArr := make([]int64, 0)
  170. if sid == "" || eid == "" || len(using_ext_node) == 0 {
  171. return dataArr, lifeArr
  172. }
  173. sess := source_mgo.GetMgoConn()
  174. defer source_mgo.DestoryMongoConn(sess)
  175. q, total := map[string]interface{}{
  176. "_id": map[string]interface{}{
  177. "$gt": StringTOBsonId(sid),
  178. "$lte": StringTOBsonId(eid),
  179. },
  180. }, int64(0)
  181. count, _ := sess.DB(source_mgo.DbName).C(source_c_name).Find(&q).Count()
  182. log.Debug("查询当前数量:", count)
  183. if len(using_ext_node) == 1 {
  184. dataArr = append(dataArr, map[string]interface{}{
  185. "sid": sid,
  186. "eid": eid,
  187. })
  188. lifeArr = append(lifeArr, calculateLiftime(count))
  189. } else {
  190. node_num := int64(len(using_ext_node))
  191. if count < node_num { //采用一个节点-多余临时删除
  192. log.Debug("数量过少~采用一个节点")
  193. tmp_node := using_ext_node[0]
  194. using_ext_node = []map[string]interface{}{}
  195. using_ext_node = append(using_ext_node, tmp_node)
  196. dataArr = append(dataArr, map[string]interface{}{
  197. "sid": sid,
  198. "eid": eid,
  199. })
  200. lifeArr = append(lifeArr, calculateLiftime(count))
  201. } else {
  202. limit := count / node_num
  203. limit_lifetime := calculateLiftime(limit)
  204. tmp_sid := sid
  205. it := sess.DB(source_mgo.DbName).C(source_c_name).Find(&q).Sort("_id").Select(map[string]interface{}{
  206. "_id": 1,
  207. }).Iter()
  208. for tmp := make(map[string]interface{}); it.Next(&tmp); {
  209. total++
  210. if total%limit == 0 {
  211. if total/limit == node_num {
  212. dataArr = append(dataArr, map[string]interface{}{
  213. "sid": tmp_sid,
  214. "eid": eid,
  215. })
  216. lifeArr = append(lifeArr, limit_lifetime)
  217. break
  218. } else {
  219. dataArr = append(dataArr, map[string]interface{}{
  220. "sid": tmp_sid,
  221. "eid": BsonTOStringId(tmp["_id"]),
  222. })
  223. tmp_sid = BsonTOStringId(tmp["_id"])
  224. lifeArr = append(lifeArr, limit_lifetime)
  225. }
  226. }
  227. tmp = make(map[string]interface{})
  228. }
  229. }
  230. }
  231. if len(dataArr) != len(using_ext_node) || len(dataArr) != len(lifeArr) {
  232. log.Debug("划分段落异常~请检查~只能采用唯一节点~")
  233. tmp_node := using_ext_node[0]
  234. using_ext_node = []map[string]interface{}{}
  235. using_ext_node = append(using_ext_node, tmp_node)
  236. dataArr = []map[string]interface{}{}
  237. lifeArr = []int64{}
  238. dataArr = append(dataArr, map[string]interface{}{
  239. "sid": sid,
  240. "eid": eid,
  241. })
  242. lifeArr = append(lifeArr, calculateLiftime(count))
  243. }
  244. return dataArr, lifeArr
  245. }
  246. // 计算生命周期
  247. func calculateLiftime(count int64) int64 {
  248. time_one := 1500.0 / 1000.0 //暂定~每千条用时1500秒
  249. life_time := int64(time_one * float64(count) * 3.0)
  250. if life_time < 2400 {
  251. life_time = 2400
  252. }
  253. return time.Now().Unix() + life_time
  254. }
  255. //暂时弃用
  256. //func sqlitID(){
  257. // if len(using_ext_node)==1 {
  258. // dataArr = append(dataArr, map[string]interface{}{
  259. // "sid":sid,
  260. // "eid":eid,
  261. // })
  262. //
  263. // }else {
  264. // interval := hex2Dec(string(eid[:8]))-hex2Dec(string(sid[:8]))
  265. // num := interval/int64(len(using_ext_node))
  266. // tmp_time := hex2Dec(string(sid[:8]))+num
  267. // for i:=0;i<len(using_ext_node);i++ {
  268. // if i==0 {
  269. // tmp_eid := fmt.Sprintf("%x",tmp_time)
  270. // dataArr = append(dataArr, map[string]interface{}{
  271. // "sid":sid,
  272. // "eid":tmp_eid+"0000000000000000",
  273. // })
  274. // }else if i==len(using_ext_node)-1 {
  275. // tmp_sid := fmt.Sprintf("%x",tmp_time)
  276. // dataArr = append(dataArr, map[string]interface{}{
  277. // "sid":tmp_sid+"0000000000000000",
  278. // "eid":eid,
  279. // })
  280. // }else {
  281. // tmp_sid := fmt.Sprintf("%x",tmp_time)
  282. // tmp_time = tmp_time+num
  283. // tmp_eid := fmt.Sprintf("%x",tmp_time)
  284. // dataArr = append(dataArr, map[string]interface{}{
  285. // "sid":tmp_sid+"0000000000000000",
  286. // "eid":tmp_eid+"0000000000000000",
  287. // })
  288. // }
  289. // }
  290. // }
  291. //}
  292. //
  293. //func hex2Dec(val string)int64{
  294. // n,_ := strconv.ParseInt(val,16,32)
  295. // return n
  296. //}