extractudp.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. // extractudp
  2. package extract
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. db "jy/mongodbutil"
  7. ju "jy/util"
  8. mu "mfw/util"
  9. "net"
  10. qu "qfw/util"
  11. "sync"
  12. log "github.com/donnie4w/go-logger/logger"
  13. "gopkg.in/mgo.v2/bson"
  14. )
  15. var Udpclient mu.UdpClient //udp对象
  16. var nextNodes []map[string]interface{}
  17. //udp通知抽取
  18. func ExtractUdp() {
  19. nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
  20. Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
  21. Udpclient.Listen(processUdpMsg)
  22. }
  23. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  24. switch act {
  25. case mu.OP_TYPE_DATA:
  26. var rep map[string]interface{}
  27. err := json.Unmarshal(data, &rep)
  28. if err != nil {
  29. log.Debug(err)
  30. } else {
  31. sid, _ := rep["gtid"].(string)
  32. eid, _ := rep["lteid"].(string)
  33. stype, _ := rep["stype"].(string)
  34. if sid == "" || eid == "" {
  35. log.Debug("err", "sid=", sid, "eid=", eid)
  36. } else {
  37. go Udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
  38. if stype == "distributed" { //分布式抽取分支
  39. log.Debug("分布式抽取id段", sid, eid)
  40. InstanceId := qu.ObjToString(rep["InstanceId"])
  41. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  42. map[string]interface{}{
  43. "$set": map[string]interface{}{
  44. "extstatus": "running",
  45. },
  46. }, true, false)
  47. ExtractByUdp(sid, eid, qu.ObjToString(rep["InstanceId"]))
  48. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  49. map[string]interface{}{
  50. "$set": map[string]interface{}{
  51. "extstatus": "ok",
  52. },
  53. }, true, false)
  54. log.Debug("分布式抽取完成", sid, eid, "释放esc实例", qu.ObjToString(rep["ip"]))
  55. } else {
  56. log.Debug("udp通知抽取id段", sid, eid)
  57. ExtractByUdp(sid, eid)
  58. log.Debug("udp通知抽取完成,eid=", eid)
  59. for _, m := range nextNodes {
  60. by, _ := json.Marshal(map[string]interface{}{
  61. "gtid": sid,
  62. "lteid": eid,
  63. "stype": qu.ObjToString(m["stype"]),
  64. })
  65. err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  66. IP: net.ParseIP(m["addr"].(string)),
  67. Port: qu.IntAll(m["port"]),
  68. })
  69. if err != nil {
  70. log.Debug(err)
  71. }
  72. }
  73. }
  74. }
  75. }
  76. case mu.OP_NOOP: //下个节点回应
  77. var rep map[string]interface{}
  78. err := json.Unmarshal(data, &rep)
  79. if err != nil {
  80. log.Debug(err)
  81. } else {
  82. log.Debug(rep)
  83. }
  84. }
  85. }
  86. var ext *ExtractTask
  87. //根据id区间抽取
  88. func ExtractByUdp(sid, eid string, instanceId ...string) {
  89. defer qu.Catch()
  90. if ext == nil {
  91. ext = &ExtractTask{}
  92. ext.Id = qu.ObjToString(ju.Config["udptaskid"])
  93. ext.InitTaskInfo()
  94. ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  95. ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
  96. ext.InitRulePres()
  97. ext.InitRuleBacks()
  98. ext.InitRuleCore()
  99. ext.InitBlockRule()
  100. ext.InitTag()
  101. ext.InitClearFn()
  102. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  103. //初始化城市DFA信息
  104. ext.InitCityDFA()
  105. ext.InitAreaCode()
  106. ext.InitPostCode()
  107. }
  108. //质量审核
  109. ext.InitAuditFields()
  110. ext.InitAuditRule()
  111. ext.InitAuditClass()
  112. ext.InitAuditRecogField()
  113. //品牌抽取是否开启
  114. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  115. ext.ResultSave(true)
  116. ext.BidSave(true)
  117. ext.IsRun = true
  118. }
  119. if len(instanceId) > 0 { //分布式抽取进度
  120. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  121. count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  122. count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
  123. count := count1 + count2
  124. pageNum := (count + PageSize - 1) / PageSize
  125. limit := PageSize
  126. if count < PageSize {
  127. limit = count
  128. }
  129. fmt.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
  130. startI := 0 //接着上次任务执行
  131. sidback := sid
  132. esc, _ := db.Mgo.FindOne("ecs", `{"InstanceId":"`+instanceId[0]+`"}`)
  133. startI = qu.IntAll((*esc)["pagecurrent"])
  134. if qu.ObjToString((*esc)["lastId"]) != "" {
  135. sid = qu.ObjToString((*esc)["lastId"])
  136. }
  137. if qu.ObjToString((*esc)["lastIdback"]) != "" {
  138. sidback = qu.ObjToString((*esc)["lastIdback"])
  139. }
  140. for i := startI; i < pageNum; i++ {
  141. query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  142. fmt.Printf("page=%d,query=%v", i+1, query)
  143. if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) > 0 {
  144. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
  145. for _, v := range *list {
  146. if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  147. continue
  148. }
  149. _id := qu.BsonIdToSId(v["_id"])
  150. log.Debug(_id)
  151. var j, jf *ju.Job
  152. if ext.IsFileField && v["projectinfo"] != nil {
  153. v["isextFile"] = true
  154. j, jf = ext.PreInfo(v)
  155. } else {
  156. j, _ = ext.PreInfo(v)
  157. }
  158. ext.TaskInfo.ProcessPool <- true
  159. go ext.ExtractProcess(j, jf)
  160. sid = _id
  161. }
  162. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  163. map[string]interface{}{"$set": map[string]interface{}{
  164. "lastId": sid,
  165. }}, true, false)
  166. }
  167. queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
  168. fmt.Printf("page=%d,queryback=%v", i+1, queryback)
  169. if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
  170. list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit)
  171. for _, v := range *list2 {
  172. if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  173. continue
  174. }
  175. _id := qu.BsonIdToSId(v["_id"])
  176. log.Debug(_id)
  177. var j, jf *ju.Job
  178. if ext.IsFileField && v["projectinfo"] != nil {
  179. v["isextFile"] = true
  180. j, jf = ext.PreInfo(v)
  181. } else {
  182. j, _ = ext.PreInfo(v)
  183. }
  184. ext.TaskInfo.ProcessPool <- true
  185. go ext.ExtractProcess(j, jf)
  186. sidback = _id
  187. }
  188. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  189. map[string]interface{}{"$set": map[string]interface{}{
  190. "lastIdback": sidback,
  191. }}, true, false)
  192. }
  193. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  194. map[string]interface{}{"$set": map[string]interface{}{
  195. "pagetotal": pageNum,
  196. "pagecurrent": i + 1,
  197. }}, true, false)
  198. }
  199. } else { //普通抽取
  200. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  201. count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  202. pageNum := (count + PageSize - 1) / PageSize
  203. limit := PageSize
  204. if count < PageSize {
  205. limit = count
  206. }
  207. wg := sync.WaitGroup{}
  208. for i := 0; i < pageNum; i++ {
  209. query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid)}}
  210. fmt.Printf("page=%d,query=%v", i+1, query)
  211. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
  212. for k, v := range *list {
  213. if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  214. continue
  215. }
  216. _id := qu.BsonIdToSId(v["_id"])
  217. var j, jf *ju.Job
  218. if ext.IsFileField && v["projectinfo"] != nil {
  219. v["isextFile"] = true
  220. j, jf = ext.PreInfo(v)
  221. } else {
  222. j, _ = ext.PreInfo(v)
  223. }
  224. ext.TaskInfo.ProcessPool <- true
  225. wg.Add(1)
  226. go func() {
  227. defer wg.Done()
  228. ext.ExtractProcess(j, jf)
  229. }()
  230. if k%1000 == 0 {
  231. log.Debug(i, k, _id)
  232. }
  233. sid = _id
  234. }
  235. }
  236. wg.Wait()
  237. ext.BidSave(false)
  238. }
  239. }