extractudp.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. // extractudp
  2. package extract
  3. import (
  4. "encoding/json"
  5. db "jy/mongodbutil"
  6. ju "jy/util"
  7. "log"
  8. mu "mfw/util"
  9. "net"
  10. qu "qfw/util"
  11. "gopkg.in/mgo.v2/bson"
  12. )
  13. var Udpclient mu.UdpClient //udp对象
  14. var nextNodes []map[string]interface{}
  15. //udp通知抽取
  16. func ExtractUdp() {
  17. nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
  18. Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
  19. Udpclient.Listen(processUdpMsg)
  20. }
  21. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  22. switch act {
  23. case mu.OP_TYPE_DATA:
  24. var rep map[string]interface{}
  25. err := json.Unmarshal(data, &rep)
  26. if err != nil {
  27. log.Println(err)
  28. } else {
  29. sid, _ := rep["gtid"].(string)
  30. eid, _ := rep["lteid"].(string)
  31. stype, _ := rep["stype"].(string)
  32. if stype == "distributed" { //分布式抽取分支
  33. log.Println("分布式抽取id段", sid, eid)
  34. InstanceId := qu.ObjToString(rep["InstanceId"])
  35. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  36. map[string]interface{}{
  37. "$set": map[string]interface{}{
  38. "extstatus": "running",
  39. },
  40. }, true, false)
  41. ExtractByUdp(sid, eid, qu.ObjToString(rep["InstanceId"]))
  42. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  43. map[string]interface{}{
  44. "$set": map[string]interface{}{
  45. "extstatus": "ok",
  46. },
  47. }, true, false)
  48. log.Println("分布式抽取完成", sid, eid, "释放esc实例", qu.ObjToString(rep["ip"]))
  49. } else {
  50. log.Println("udp通知抽取id段", sid, eid)
  51. ExtractByUdp(sid, eid)
  52. log.Println("udp通知抽取完成,eid=", eid)
  53. for _, m := range nextNodes {
  54. by, _ := json.Marshal(map[string]interface{}{
  55. "gtid": sid,
  56. "lteid": eid,
  57. "stype": qu.ObjToString(m["stype"]),
  58. })
  59. err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  60. IP: net.ParseIP(m["addr"].(string)),
  61. Port: qu.IntAll(m["port"]),
  62. })
  63. if err != nil {
  64. log.Println(err)
  65. }
  66. }
  67. }
  68. }
  69. case mu.OP_NOOP: //下个节点回应
  70. var rep map[string]interface{}
  71. err := json.Unmarshal(data, &rep)
  72. if err != nil {
  73. log.Println(err)
  74. } else {
  75. log.Println(rep)
  76. }
  77. }
  78. }
  79. //根据id区间抽取
  80. func ExtractByUdp(sid, eid string, instanceId ...string) {
  81. defer qu.Catch()
  82. ext := &ExtractTask{}
  83. ext.Id = qu.ObjToString(ju.Config["udptaskid"])
  84. ext.InitTaskInfo()
  85. ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  86. ext.InitRulePres()
  87. ext.InitRuleBacks()
  88. ext.InitRuleCore()
  89. ext.InitTag()
  90. ext.InitClearFn()
  91. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  92. //初始化城市DFA信息
  93. ext.InitDFA()
  94. }
  95. //质量审核
  96. ext.InitAuditFields()
  97. ext.InitAuditRule()
  98. ext.InitAuditClass()
  99. ext.InitAuditRecogField()
  100. //品牌抽取是否开启
  101. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  102. //附件抽取是否开启
  103. ext.InitFile()
  104. go ext.ResultSave()
  105. go ext.BidSave()
  106. ext.IsRun = true
  107. if len(instanceId) > 0 { //分布式抽取进度
  108. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  109. count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  110. count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
  111. count := count1 + count2
  112. pageNum := (count + PageSize - 1) / PageSize
  113. limit := PageSize
  114. if count < PageSize {
  115. limit = count
  116. }
  117. log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
  118. startI := 0 //接着上次任务执行
  119. sidback := sid
  120. esc, _ := db.Mgo.FindOne("ecs", `{"InstanceId":"`+instanceId[0]+`"}`)
  121. startI = qu.IntAll((*esc)["pagecurrent"])
  122. if qu.ObjToString((*esc)["lastId"]) != "" {
  123. sid = qu.ObjToString((*esc)["lastId"])
  124. }
  125. if qu.ObjToString((*esc)["lastIdback"]) != "" {
  126. sidback = qu.ObjToString((*esc)["lastIdback"])
  127. }
  128. for i := startI; i < pageNum; i++ {
  129. query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  130. log.Printf("page=%d,query=%v", i+1, query)
  131. if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) > 0 {
  132. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
  133. for _, v := range *list {
  134. if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  135. continue
  136. }
  137. _id := qu.BsonIdToSId(v["_id"])
  138. log.Println(_id)
  139. var j, jf *ju.Job
  140. if ext.IsFileField && v["projectinfo"] != nil {
  141. v["isextFile"] = true
  142. j, jf = PreInfo(v)
  143. } else {
  144. j, _ = PreInfo(v)
  145. }
  146. ext.TaskInfo.ProcessPool <- true
  147. go ext.ExtractProcess(j, jf)
  148. sid = _id
  149. }
  150. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  151. map[string]interface{}{"$set": map[string]interface{}{
  152. "lastId": sid,
  153. }}, true, false)
  154. }
  155. queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
  156. log.Printf("page=%d,queryback=%v", i+1, queryback)
  157. if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
  158. list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit)
  159. for _, v := range *list2 {
  160. if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  161. continue
  162. }
  163. _id := qu.BsonIdToSId(v["_id"])
  164. log.Println(_id)
  165. var j, jf *ju.Job
  166. if ext.IsFileField && v["projectinfo"] != nil {
  167. v["isextFile"] = true
  168. j, jf = PreInfo(v)
  169. } else {
  170. j, _ = PreInfo(v)
  171. }
  172. ext.TaskInfo.ProcessPool <- true
  173. go ext.ExtractProcess(j, jf)
  174. sidback = _id
  175. }
  176. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  177. map[string]interface{}{"$set": map[string]interface{}{
  178. "lastIdback": sidback,
  179. }}, true, false)
  180. }
  181. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  182. map[string]interface{}{"$set": map[string]interface{}{
  183. "pagetotal": pageNum,
  184. "pagecurrent": i + 1,
  185. }}, true, false)
  186. }
  187. } else { //普通抽取
  188. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  189. count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  190. pageNum := (count + PageSize - 1) / PageSize
  191. limit := PageSize
  192. if count < PageSize {
  193. limit = count
  194. }
  195. for i := 0; i < pageNum; i++ {
  196. query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid)}}
  197. log.Printf("page=%d,query=%v", i+1, query)
  198. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
  199. for _, v := range *list {
  200. if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  201. continue
  202. }
  203. _id := qu.BsonIdToSId(v["_id"])
  204. log.Println(_id)
  205. var j, jf *ju.Job
  206. if ext.IsFileField && v["projectinfo"] != nil {
  207. v["isextFile"] = true
  208. j, jf = PreInfo(v)
  209. } else {
  210. j, _ = PreInfo(v)
  211. }
  212. ext.TaskInfo.ProcessPool <- true
  213. go ext.ExtractProcess(j, jf)
  214. sid = _id
  215. }
  216. }
  217. }
  218. }