extractudp.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. // extractudp
  2. package extract
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. db "jy/mongodbutil"
  7. ju "jy/util"
  8. mu "mfw/util"
  9. "net"
  10. qu "qfw/util"
  11. "sync"
  12. "time"
  13. log "github.com/donnie4w/go-logger/logger"
  14. "gopkg.in/mgo.v2/bson"
  15. )
  16. var Udpclient mu.UdpClient //udp对象
  17. //udp通知抽取
  18. func ExtractUdp() {
  19. Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
  20. Udpclient.Listen(processUdpMsg)
  21. }
  22. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  23. switch act {
  24. case mu.OP_TYPE_DATA:
  25. var rep map[string]interface{}
  26. err := json.Unmarshal(data, &rep)
  27. if err != nil {
  28. log.Debug(err)
  29. } else {
  30. stype, _ := rep["stype"].(string)
  31. if stype == "distributed" { //分布式抽取分支
  32. go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
  33. InstanceId := qu.ObjToString(rep["InstanceId"])
  34. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  35. map[string]interface{}{
  36. "$set": map[string]interface{}{
  37. "extstatus": "running",
  38. },
  39. }, true, false)
  40. ExtractByUdp("", "", ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"]))
  41. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  42. map[string]interface{}{
  43. "$set": map[string]interface{}{
  44. "extstatus": "ok",
  45. },
  46. }, true, false)
  47. log.Debug("分布式抽取完成,可以释放esc实例", qu.ObjToString(rep["ip"]))
  48. } else {
  49. sid, _ := rep["gtid"].(string)
  50. eid, _ := rep["lteid"].(string)
  51. if sid == "" || eid == "" {
  52. log.Debug("err", "sid=", sid, ",eid=", eid)
  53. } else {
  54. udpinfo, _ := rep["stype"].(string)
  55. if udpinfo == "" {
  56. udpinfo = "udpok"
  57. }
  58. ExtractByUdp(sid, eid, ra)
  59. log.Debug("抽取完成udp通知抽取id段",udpinfo, sid, "~", eid)
  60. Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
  61. }
  62. }
  63. }
  64. case mu.OP_NOOP: //下个节点回应
  65. log.Debug(string(data))
  66. }
  67. }
  68. var ext *ExtractTask
  69. //根据id区间抽取-udp模式
  70. func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
  71. defer qu.Catch()
  72. if ext == nil {
  73. ext = &ExtractTask{}
  74. ext.Id = qu.ObjToString(ju.Config["udptaskid"])
  75. ext.InitTaskInfo()
  76. ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  77. ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
  78. ext.InitSite()
  79. ext.InitRulePres()
  80. ext.InitRuleBacks(false)
  81. ext.InitRuleBacks(true)
  82. ext.InitRuleCore(false)
  83. ext.InitRuleCore(true)
  84. ext.InitBlockRule()
  85. ext.InitPkgCore()
  86. ext.InitTag(false)
  87. ext.InitTag(true)
  88. ext.InitClearFn(false)
  89. ext.InitClearFn(true)
  90. ext.Lock()
  91. //ext.IsExtractCity = false
  92. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  93. //初始化城市DFA信息
  94. //ext.InitCityDFA()
  95. ext.InitCityInfo()
  96. ext.InitAreaCode()
  97. ext.InitPostCode()
  98. }
  99. ext.Unlock()
  100. //质量审核
  101. ext.InitAuditFields()
  102. ext.InitAuditRule()
  103. ext.InitAuditClass()
  104. ext.InitAuditRecogField()
  105. //品牌抽取是否开启
  106. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  107. ext.ResultSave(true)
  108. ext.BidSave(true)
  109. ext.IsRun = true
  110. ext.InitFile()
  111. } else {
  112. ext.BidTotal = 0
  113. }
  114. index := 0
  115. if len(instanceId) > 0 { //分布式抽取进度
  116. go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra)
  117. for {
  118. tsk, b := db.Mgo.FindOne("esctask", `{"state":{"$lt":1}}`)
  119. if tsk != nil && !b {
  120. break
  121. }
  122. db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
  123. "$set": map[string]interface{}{
  124. "InstanceId": instanceId[0],
  125. "state": 1,
  126. "runtime": time.Now().Format(qu.Date_Full_Layout),
  127. },
  128. })
  129. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(qu.ObjToString((*tsk)["sid"])), "$lte": bson.ObjectIdHex(qu.ObjToString((*tsk)["eid"]))}}
  130. count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  131. count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
  132. log.Debug("timestr", (*tsk)["timestr"], "count", count1+count2)
  133. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
  134. for _, v := range *list {
  135. //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  136. // log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
  137. // continue
  138. //}
  139. if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
  140. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  141. continue
  142. }
  143. if qu.ObjToString(v["subtype"])!="中标" &&
  144. qu.ObjToString(v["subtype"])!="成交" &&
  145. qu.ObjToString(v["subtype"])!="合同" {
  146. continue
  147. }
  148. var j, jf *ju.Job
  149. var isSite bool
  150. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  151. v["isextFile"] = true
  152. j, jf, isSite = ext.PreInfo(v)
  153. } else {
  154. j, _, isSite = ext.PreInfo(v)
  155. }
  156. go ext.ExtractProcess(j, jf, isSite)
  157. index++
  158. ext.TaskInfo.ProcessPool <- true
  159. }
  160. list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, -1, -1)
  161. for _, v := range *list2 {
  162. //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  163. // continue
  164. //}
  165. if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
  166. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  167. continue
  168. }
  169. if qu.ObjToString(v["subtype"])!="中标" &&
  170. qu.ObjToString(v["subtype"])!="成交" &&
  171. qu.ObjToString(v["subtype"])!="合同" {
  172. continue
  173. }
  174. var j, jf *ju.Job
  175. var isSite bool
  176. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  177. v["isextFile"] = true
  178. j, jf, isSite = ext.PreInfo(v)
  179. } else {
  180. j, _, isSite = ext.PreInfo(v)
  181. }
  182. go ext.ExtractProcess(j, jf, isSite)
  183. index++
  184. ext.TaskInfo.ProcessPool <- true
  185. }
  186. db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
  187. "$set": map[string]interface{}{
  188. "InstanceId": instanceId[0],
  189. "oktime": time.Now().Format(qu.Date_Full_Layout),
  190. "state": 1,
  191. },
  192. })
  193. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  194. map[string]interface{}{
  195. "$inc": map[string]interface{}{
  196. "totalnum": count1 + count2,
  197. "step": 1,
  198. },
  199. }, true, false)
  200. }
  201. log.Debug("抽取完成", "index:", index, "bidtotal:", ext.BidTotal)
  202. } else {
  203. //普通抽取
  204. query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  205. count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  206. log.Debug("查询条件为:", query, "查询条数:", count)
  207. pageNum := (count + PageSize - 1) / PageSize
  208. limit := PageSize
  209. if count < PageSize {
  210. limit = count
  211. }
  212. wg := sync.WaitGroup{}
  213. for i := 0; i < pageNum; i++ {
  214. query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid)}}
  215. fmt.Printf("page=%d,query=%v\n", i+1, query)
  216. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, `{"_id":1}`, Fields, false, 0, limit)
  217. for _, v := range *list {
  218. //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  219. // log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
  220. // continue
  221. //}
  222. if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
  223. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  224. continue
  225. }
  226. _id := qu.BsonIdToSId(v["_id"])
  227. var j, jf *ju.Job
  228. var isSite bool
  229. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  230. v["isextFile"] = true
  231. j, jf, isSite = ext.PreInfo(v)
  232. } else {
  233. j, _, isSite = ext.PreInfo(v)
  234. }
  235. ext.TaskInfo.ProcessPool <- true
  236. wg.Add(1)
  237. go func(wg *sync.WaitGroup, j, jf *ju.Job) {
  238. defer wg.Done()
  239. //log.Debug(index,j.SourceMid,)
  240. ext.ExtractProcess(j, jf, isSite)
  241. }(&wg, j, jf)
  242. index++
  243. if index%1000 == 0 {
  244. log.Debug("index:", index, ",页码:", i+1, ",_id:", _id)
  245. }
  246. sid = _id
  247. if sid >= eid {
  248. break
  249. }
  250. }
  251. }
  252. wg.Wait()
  253. ext.BidSave(false)
  254. log.Debug("抽取完成,", "count:", count, ",index:", index, ",bidtotal:", ext.BidTotal, ",eid:", eid)
  255. }
  256. }
  257. //中标预测信息抽取,ossid为附件识别后的id
  258. var exF *ExtractTask
  259. func ExtractByBidForecast(infoid string, ossid ...string) map[string]interface{} {
  260. defer qu.Catch()
  261. if exF == nil {
  262. exF = &ExtractTask{}
  263. exF.Id = qu.ObjToString(ju.Config["udptaskid"])
  264. exF.InitTaskInfo()
  265. exF.TaskInfo.FDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.FromDbAddr, exF.TaskInfo.FromDB)
  266. exF.TaskInfo.TDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.ToDbAddr, exF.TaskInfo.ToDB)
  267. exF.InitSite()
  268. exF.InitRulePres()
  269. exF.InitRuleBacks(false)
  270. exF.InitRuleBacks(true)
  271. exF.InitRuleCore(false)
  272. exF.InitRuleCore(true)
  273. exF.InitBlockRule()
  274. exF.InitPkgCore()
  275. exF.InitTag(false)
  276. exF.InitTag(true)
  277. exF.InitClearFn(false)
  278. exF.InitClearFn(true)
  279. if exF.IsExtractCity { //版本上控制是否开始城市抽取
  280. //初始化城市DFA信息
  281. //exF.InitCityDFA()
  282. exF.InitCityInfo()
  283. exF.InitAreaCode()
  284. exF.InitPostCode()
  285. }
  286. //质量审核
  287. exF.InitAuditFields()
  288. exF.InitAuditRule()
  289. exF.InitAuditClass()
  290. exF.InitAuditRecogField()
  291. //品牌抽取是否开启
  292. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  293. exF.ResultSave(true)
  294. exF.BidSave(true)
  295. exF.IsRun = true
  296. exF.InitFile()
  297. }
  298. tmp, _ := exF.TaskInfo.FDB.FindById(exF.TaskInfo.FromColl, infoid, nil)
  299. if exF.IsFileField && ((*tmp)["projectinfo"] != nil || (*tmp)["attach_text"] != nil) {
  300. (*tmp)["isextFile"] = true
  301. }
  302. exF.TaskInfo.ProcessPool <- true
  303. j, jf, _ := exF.PreInfo(*tmp)
  304. wg := sync.WaitGroup{}
  305. wg.Add(1)
  306. go func(wg *sync.WaitGroup, j, jf *ju.Job) {
  307. defer wg.Done()
  308. exF.ExtractProcess(j, jf, false)
  309. }(&wg, j, jf)
  310. wg.Wait()
  311. exF.BidSave(false)
  312. return nil
  313. }