udptask.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. package udptask
  2. import (
  3. util "common_utils"
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. mu "mfw/util"
  9. "net"
  10. "net/http"
  11. qutil "qfw/util"
  12. "sync"
  13. u "util"
  14. // "sync"
  15. "task"
  16. "time"
  17. . "tools"
  18. )
  19. var responselock sync.Mutex
  20. var LastNodeResponse int64
  21. func InitUdp() {
  22. go func() {
  23. updport, _ := Config["udpport"].(string)
  24. Udpclient = mu.UdpClient{Local: ":" + updport, BufSize: 1024}
  25. Udpclient.Listen(processUdpMsg)
  26. log.Println("Udp服务监听", updport)
  27. time.Sleep(99999 * time.Hour)
  28. }()
  29. go checkMapJob()
  30. }
  31. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  32. log.Println("udp回调", string(data), "-----", ra, "-----", act)
  33. defer qutil.Catch()
  34. switch act {
  35. case mu.OP_TYPE_DATA: //上个节点的数据
  36. var mapInfo map[string]interface{}
  37. err := json.Unmarshal(data, &mapInfo)
  38. log.Println("err:", err, "mapInfo:", mapInfo)
  39. stype, _ := mapInfo["stype"].(string)
  40. if err != nil || stype == "" {
  41. Udpclient.WriteUdp([]byte("stype:"+stype+",err:"+err.Error()), mu.OP_NOOP, ra)
  42. } else if mapInfo != nil {
  43. if stype == "distributed" { //分布式抽取分支
  44. log.Println("分布式00000--开始")
  45. go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qutil.ObjToString(mapInfo["ip"])+"udpok"), mu.OP_NOOP, ra)
  46. InstanceId := qutil.ObjToString(mapInfo["InstanceId"])
  47. MgoDcs.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  48. map[string]interface{}{
  49. "$set": map[string]interface{}{
  50. "extstatus": "running",
  51. },
  52. }, true, false)
  53. //执行抽取任务
  54. ExtractByUdp(ra, qutil.ObjToString(mapInfo["InstanceId"]), qutil.ObjToString(mapInfo["ip"]))
  55. MgoDcs.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  56. map[string]interface{}{
  57. "$set": map[string]interface{}{
  58. "extstatus": "ok",
  59. },
  60. }, true, false)
  61. log.Println("分布式抽取完成,可以释放esc实例", qutil.ObjToString(mapInfo["ip"]))
  62. } else {
  63. key, _ := mapInfo["key"].(string)
  64. if key == "" {
  65. key = "udpok"
  66. }
  67. go Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  68. //行业分类开始,更新bidding_processing_ids表dataprocess=5
  69. if stype == "hangye" {
  70. LastNodeResponse = time.Now().Unix()
  71. task.HangyeUdps <- mapInfo
  72. gtid := mapInfo["gtid"].(string)
  73. lteid := mapInfo["lteid"].(string)
  74. query := map[string]interface{}{
  75. "gtid": gtid,
  76. "lteid": lteid,
  77. }
  78. // 使用老表更新dataprocess 时
  79. if util.ObjToString(Config["dbname_old"]) != "" {
  80. set := map[string]interface{}{
  81. "$set": map[string]interface{}{
  82. "dataprocess_ai": 4,
  83. "updatetime": time.Now().Unix(),
  84. },
  85. }
  86. MgoClassOld.Update("bidding_processing_ids", query, set, false, false)
  87. } else {
  88. set := map[string]interface{}{
  89. "$set": map[string]interface{}{
  90. "dataprocess": 5,
  91. "updatetime": time.Now().Unix(),
  92. },
  93. }
  94. MgoClass.Update("bidding_processing_ids", query, set, false, false)
  95. }
  96. } else if stype == "monitor" { //程序监听类型
  97. fmt.Println("stype :monitor")
  98. } else if stype != "" {
  99. go UdpTask(stype, mapInfo) //执行分类
  100. } else {
  101. log.Println("stype 为空")
  102. }
  103. }
  104. }
  105. case mu.OP_NOOP: //下个节点回应
  106. udptaskmap.Delete(string(data))
  107. log.Println("下节点回应:", string(data))
  108. }
  109. }
  110. // 行业分类udp任务执行
  111. func RunningHangyeClass() {
  112. defer qutil.Catch()
  113. go func() {
  114. for {
  115. time.Sleep(1 * time.Minute)
  116. qutil.Debug("内存中行业分类剩余id段个数:", len(task.HangyeUdps))
  117. }
  118. }()
  119. for {
  120. mapInfo := <-task.HangyeUdps
  121. qutil.Debug("行业分类udps mapinfo:", mapInfo)
  122. UdpTask("hangye", mapInfo)
  123. }
  124. }
  125. // UdpTask udp 任务
  126. func UdpTask(stype string, mapInfo map[string]interface{}) int {
  127. total := 0
  128. defer qutil.Catch()
  129. tconf, _ := Config[stype].(map[string]interface{})
  130. if tconf != nil {
  131. tid, _ := tconf["taskid"].(string)
  132. task.TaskLock.Lock()
  133. defer task.TaskLock.Unlock()
  134. t := task.NEWTASKPOOL[tid]
  135. log.Println("ttt==nil:", t == nil)
  136. if t == nil || (t != nil && t.B_UpdateRule) { //加载任务
  137. //更新任务的b_updaterule,避免下次重新加载rule
  138. task.UpdateTaskInfo(false, tid)
  139. task.InitTaskData(tid) //初始化任务信息
  140. bres, tt, _ := task.NewAnalyTask(tid, "", "", "", 5) //初始化连接
  141. if bres && tt != nil {
  142. task.NEWTASKPOOL[tid] = tt
  143. log.Println("udp加载任务", tt.S_name)
  144. }
  145. }
  146. t = task.NEWTASKPOOL[tid]
  147. if t != nil {
  148. t.I_wordcount = 0
  149. q, _ := mapInfo["query"].(map[string]interface{})
  150. if q == nil {
  151. q = map[string]interface{}{
  152. "_id": map[string]interface{}{
  153. "$gt": u.StringTOBsonId(mapInfo["gtid"].(string)),
  154. "$lte": u.StringTOBsonId(mapInfo["lteid"].(string)),
  155. },
  156. }
  157. }
  158. mapInfo["q"] = q
  159. log.Println("启动任务")
  160. if t.MulMgo != nil {
  161. log.Println(stype, "分类,走合并数据")
  162. total = task.UdpTaskRunAll(t, true, mapInfo, stype)
  163. } else {
  164. log.Println(stype, "分类,不走合并数据")
  165. total = task.NewTaskRunAll(t, true, mapInfo)
  166. }
  167. //任务完成,调度下个节点
  168. if tconf["nextNode"] != nil && mapInfo["stop"] == nil {
  169. arr := qutil.ObjArrToMapArr(tconf["nextNode"].([]interface{}))
  170. if len(arr) > 0 {
  171. for _, to := range arr {
  172. sid, _ := mapInfo["gtid"].(string)
  173. eid, _ := mapInfo["lteid"].(string)
  174. key := sid + "-" + eid + "-" + qutil.ObjToString(to["stype"])
  175. by, _ := json.Marshal(map[string]interface{}{
  176. "gtid": sid,
  177. "lteid": eid,
  178. "stype": qutil.ObjToString(to["stype"]),
  179. "key": key,
  180. })
  181. addr := &net.UDPAddr{
  182. IP: net.ParseIP(to["addr"].(string)),
  183. Port: qutil.IntAll(to["port"]),
  184. }
  185. node := &UdpNode{by, addr, time.Now().Unix(), 0}
  186. udptaskmap.Store(key, node)
  187. Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  188. }
  189. }
  190. }
  191. }
  192. }
  193. return total
  194. }
  195. // 分布式抽取-执行
  196. func ExtractByUdp(ra *net.UDPAddr, instanceId ...string) {
  197. if len(instanceId) > 0 { //分布式抽取进度
  198. go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra)
  199. for {
  200. tsk, b := MgoDcs.FindOne("esctask", `{"state":{"$lt":1}}`)
  201. if tsk != nil && !b {
  202. break
  203. }
  204. MgoDcs.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
  205. "$set": map[string]interface{}{
  206. "InstanceId": instanceId[0],
  207. "state": 1,
  208. "runtime": time.Now().Format(qutil.Date_Full_Layout),
  209. },
  210. })
  211. ip := qutil.ObjToString(instanceId[1])
  212. sid := qutil.ObjToString((*tsk)["sid"])
  213. eid := qutil.ObjToString((*tsk)["eid"])
  214. mapinfo := map[string]interface{}{}
  215. if sid == "" || eid == "" {
  216. log.Println("sid,eid参数不能为空")
  217. break
  218. }
  219. mapinfo["ip"] = ip
  220. mapinfo["gtid"] = sid
  221. mapinfo["lteid"] = eid
  222. mapinfo["stop"] = "true"
  223. totalZB := UdpTask("newzhaobiao", mapinfo) //招标
  224. totalHY := UdpTask("newhangye", mapinfo) //行业
  225. totalYZ := UdpTask("newyezhu", mapinfo) //业主
  226. totalBQ := UdpTask("newbiaoqian", mapinfo) //标签
  227. //totalKT := UdpTask("kvtextzhaobiao", []byte{}, mapinfo) //kvtext
  228. if totalZB > 0 {
  229. log.Println("总数-数量:", totalZB)
  230. }
  231. MgoDcs.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
  232. "$set": map[string]interface{}{
  233. "InstanceId": instanceId[0],
  234. "oktime": time.Now().Format(qutil.Date_Full_Layout),
  235. "state": 1,
  236. },
  237. })
  238. set := map[string]interface{}{
  239. "$inc": map[string]interface{}{
  240. "step": 1,
  241. "totalnum": totalZB,
  242. "totalhy": totalHY,
  243. "totalyz": totalYZ,
  244. "totalbq": totalBQ,
  245. },
  246. }
  247. //如果同一id段数据不一致做记录
  248. if totalZB != totalHY || totalZB != totalYZ || totalHY != totalYZ || totalZB == 0 {
  249. set["$addToset"] = map[string]interface{}{
  250. "errnum": map[string]interface{}{
  251. "sid": sid,
  252. "eid": eid,
  253. "totalnum": totalZB,
  254. "totalhy": totalHY,
  255. "totalyz": totalYZ,
  256. //"totalkt": totalKT,
  257. },
  258. }
  259. }
  260. MgoDcs.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`, set, true, false)
  261. }
  262. log.Println("分类完成")
  263. }
  264. }
  265. // LastUdpJob 处理UDP 没有接受数据
  266. func LastUdpJob() {
  267. for {
  268. responselock.Lock()
  269. if time.Now().Unix()-LastNodeResponse >= 1800 {
  270. LastNodeResponse = time.Now().Unix() //重置时间
  271. sendErrMailApi("分类异常", fmt.Sprintf("半小时左右~无新段落数据进入 分类流程...相关人员检查..."))
  272. }
  273. responselock.Unlock()
  274. time.Sleep(300 * time.Second)
  275. }
  276. }
  277. // sendErrMailApi 发送邮件
  278. func sendErrMailApi(title, body string) {
  279. jkmail, _ := Config["jkmail"].(map[string]interface{})
  280. if jkmail != nil {
  281. tomail, _ = jkmail["to"].(string)
  282. api, _ = jkmail["api"].(string)
  283. }
  284. log.Println(tomail, api)
  285. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
  286. if err == nil {
  287. defer res.Body.Close()
  288. read, err := ioutil.ReadAll(res.Body)
  289. log.Println("邮件发送成功:", string(read), err)
  290. } else {
  291. log.Println("邮件发送失败:", err)
  292. }
  293. }