extractudp.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. // extractudp
  2. package extract
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. db "jy/mongodbutil"
  7. ju "jy/util"
  8. mu "mfw/util"
  9. "net"
  10. qu "qfw/util"
  11. "sync"
  12. "time"
  13. log "github.com/donnie4w/go-logger/logger"
  14. "gopkg.in/mgo.v2/bson"
  15. )
  16. var Udpclient mu.UdpClient //udp对象
  17. var nextNodes []map[string]interface{}
  18. var IsExtStop bool
  19. // 新增机器节点
  20. func ExtractUdpUpdateMachine() {
  21. machine := *qu.ObjToMap(ju.Config["udpmachine"])
  22. if len(machine) == 0 || machine == nil {
  23. return
  24. }
  25. skey := fmt.Sprintf("%s:%d:%s", machine["addr"], qu.IntAll(machine["port"]), machine["stype"])
  26. machine["skey"] = skey
  27. db.Mgo.Update("extract_control_center", map[string]interface{}{"skey": skey}, machine, true, false)
  28. }
  29. // udp通知抽取
  30. func ExtractUdp() {
  31. nextNodes = ju.IsMarkInterfaceMap(ju.Config["nextNode"])
  32. Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
  33. Udpclient.Listen(processUdpMsg)
  34. }
  35. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  36. switch act {
  37. case mu.OP_TYPE_DATA:
  38. var rep map[string]interface{}
  39. err := json.Unmarshal(data, &rep)
  40. if err != nil {
  41. log.Debug(err)
  42. } else {
  43. stype, _ := rep["stype"].(string)
  44. if stype == "distributed" { //分布式抽取分支
  45. go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
  46. InstanceId := qu.ObjToString(rep["InstanceId"])
  47. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  48. map[string]interface{}{
  49. "$set": map[string]interface{}{
  50. "extstatus": "running",
  51. },
  52. }, true, false)
  53. ExtractByUdp("", "", ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"]))
  54. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  55. map[string]interface{}{
  56. "$set": map[string]interface{}{
  57. "extstatus": "ok",
  58. },
  59. }, true, false)
  60. log.Debug("分布式抽取完成,可以释放esc实例", qu.ObjToString(rep["ip"]))
  61. } else if stype == "stop_extract" {
  62. IsExtStop = true
  63. } else if stype == "heart_extract" {
  64. skey, _ := rep["skey"].(string)
  65. Udpclient.WriteUdp([]byte(skey), mu.OP_NOOP, ra)
  66. } else if stype == "update_rule" {
  67. ju.IsUpdateRule = true
  68. } else if stype == "monitor" {
  69. log.Debug("收到监测......")
  70. Udpclient.WriteUdp([]byte("monitor ok"), mu.OP_NOOP, ra)
  71. } else {
  72. sid, _ := rep["gtid"].(string)
  73. eid, _ := rep["lteid"].(string)
  74. if sid == "" || eid == "" {
  75. log.Debug("err", "sid=", sid, ",eid=", eid)
  76. } else {
  77. //新版本控制抽取
  78. //udpinfo, _ := rep["stype"].(string)
  79. //if udpinfo == "" {
  80. // udpinfo = "udpok"
  81. //}
  82. //IsExtStop = false
  83. //ExtractByUdp(sid, eid, ra)
  84. //if !IsExtStop {
  85. // log.Debug("抽取完成udp通知抽取id段-控制台", udpinfo, sid, "~", eid)
  86. // Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
  87. //} else {
  88. // log.Debug("抽取强制中断udp不通知-控制台", udpinfo, sid, "~", eid)
  89. //}
  90. //发布数据~重采数据~测试流程
  91. //key := sid + "-" + eid + "-" + qu.ObjToString(rep["stype"])
  92. //go Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  93. //log.Debug("udp通知抽取id段", sid, " ", eid)
  94. //ExtractByUdp(sid, eid, ra)
  95. //for _, m := range nextNodes {
  96. // by, _ := json.Marshal(map[string]interface{}{
  97. // "gtid": sid,
  98. // "lteid": eid,
  99. // "stype": qu.ObjToString(m["stype"]),
  100. // })
  101. // err_udp := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  102. // IP: net.ParseIP(m["addr"].(string)),
  103. // Port: qu.IntAll(m["port"]),
  104. // })
  105. // if err_udp != nil {
  106. // log.Debug(err_udp)
  107. // }
  108. //}
  109. //log.Debug("udp通知抽取完成,eid=", eid)
  110. //预处理模块
  111. key := sid + "-" + eid + "-" + qu.ObjToString(rep["stype"])
  112. go Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  113. //保存段落合并段落···
  114. log.Debug("udp通知抽取id段", sid, " ", eid)
  115. ExtractByUdpPre(sid, eid, ra)
  116. }
  117. }
  118. }
  119. case mu.OP_NOOP: //下个节点回应
  120. log.Debug(string(data))
  121. }
  122. }
  123. var ext *ExtractTask
  124. // 根据id区间抽取-udp模式
  125. func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
  126. defer qu.Catch()
  127. if ext == nil {
  128. ext = nil
  129. ext = &ExtractTask{}
  130. ext.Id = qu.ObjToString(ju.Config["udptaskid"])
  131. ext.InitTaskInfo()
  132. ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  133. ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
  134. ext.InitSite()
  135. ext.InitRulePres()
  136. ext.InitRuleBacks(false)
  137. ext.InitRuleBacks(true)
  138. ext.InitRuleCore(false)
  139. ext.InitRuleCore(true)
  140. ext.InitBlockRule()
  141. ext.InitPkgCore()
  142. ext.InitTag(false)
  143. ext.InitTag(true)
  144. ext.InitClearFn(false)
  145. ext.InitClearFn(true)
  146. ext.Lock()
  147. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  148. ext.InitCityInfo()
  149. ext.InitAreaCode()
  150. ext.InitPostCode()
  151. }
  152. ext.Unlock()
  153. //质量审核
  154. ext.InitAuditFields()
  155. ext.InitAuditRule()
  156. ext.InitAuditClass()
  157. ext.InitAuditRecogField()
  158. //品牌抽取是否开启
  159. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  160. ext.ResultSave(true)
  161. ext.BidSave(true)
  162. ext.InitFile()
  163. ext.IsRun = true
  164. ext.BidTotal = 0
  165. } else {
  166. if ju.IsUpdateRule {
  167. ju.IsUpdateRule = false
  168. log.Debug("每天更新一次规则......")
  169. //规则重置
  170. ext.InitSite()
  171. ext.InitRulePres()
  172. ext.InitRuleBacks(false)
  173. ext.InitRuleBacks(true)
  174. ext.InitRuleCore(false)
  175. ext.InitRuleCore(true)
  176. ext.InitBlockRule()
  177. ext.InitPkgCore()
  178. ext.InitTag(false)
  179. ext.InitTag(true)
  180. ext.InitClearFn(false)
  181. ext.InitClearFn(true)
  182. //地域重置
  183. ext.Lock()
  184. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  185. ext.InitCityInfo()
  186. ext.InitAreaCode()
  187. ext.InitPostCode()
  188. }
  189. ext.Unlock()
  190. }
  191. ext.BidTotal = 0
  192. }
  193. index := 0
  194. if len(instanceId) > 0 { //分布式抽取进度
  195. go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra)
  196. for {
  197. tsk, b := db.Mgo.FindOne("esctask", `{"state":{"$lt":1}}`)
  198. if tsk != nil && !b {
  199. break
  200. }
  201. db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
  202. "$set": map[string]interface{}{
  203. "InstanceId": instanceId[0],
  204. "state": 1,
  205. "runtime": time.Now().Format(qu.Date_Full_Layout),
  206. },
  207. })
  208. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(qu.ObjToString((*tsk)["sid"])), "$lte": bson.ObjectIdHex(qu.ObjToString((*tsk)["eid"]))}}
  209. count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  210. count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
  211. log.Debug("timestr", (*tsk)["timestr"], "count", count1+count2)
  212. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
  213. for _, v := range *list {
  214. if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
  215. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  216. continue
  217. }
  218. var j, jf *ju.Job
  219. var isSite bool
  220. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  221. v["isextFile"] = true
  222. j, jf, isSite = ext.PreInfo(v)
  223. } else {
  224. j, _, isSite = ext.PreInfo(v)
  225. }
  226. go ext.ExtractProcess(j, jf, isSite)
  227. index++
  228. ext.TaskInfo.ProcessPool <- true
  229. }
  230. list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, -1, -1)
  231. for _, v := range *list2 {
  232. if spidercode[qu.ObjToString(v["spidercode"])] {
  233. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  234. continue
  235. }
  236. var j, jf *ju.Job
  237. var isSite bool
  238. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  239. v["isextFile"] = true
  240. j, jf, isSite = ext.PreInfo(v)
  241. } else {
  242. j, _, isSite = ext.PreInfo(v)
  243. }
  244. go ext.ExtractProcess(j, jf, isSite)
  245. index++
  246. ext.TaskInfo.ProcessPool <- true
  247. }
  248. db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
  249. "$set": map[string]interface{}{
  250. "InstanceId": instanceId[0],
  251. "oktime": time.Now().Format(qu.Date_Full_Layout),
  252. "state": 1,
  253. },
  254. })
  255. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  256. map[string]interface{}{
  257. "$inc": map[string]interface{}{
  258. "totalnum": count1 + count2,
  259. "step": 1,
  260. },
  261. }, true, false)
  262. }
  263. log.Debug("抽取完成", "index:", index, "bidtotal:", ext.BidTotal)
  264. } else {
  265. //普通抽取
  266. query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  267. count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  268. log.Debug("查询条件为:", query, "查询条数:", count)
  269. pageNum := (count + PageSize - 1) / PageSize
  270. limit := PageSize
  271. if count < PageSize {
  272. limit = count
  273. }
  274. wg := sync.WaitGroup{}
  275. for i := 0; i < pageNum; i++ {
  276. query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid)}}
  277. fmt.Printf("page=%d,query=%v\n", i+1, query)
  278. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, `{"_id":1}`, Fields, false, 0, limit)
  279. for _, v := range *list {
  280. if IsExtStop {
  281. break
  282. }
  283. if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
  284. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  285. continue
  286. }
  287. _id := qu.BsonIdToSId(v["_id"])
  288. var j, jf *ju.Job
  289. var isSite bool
  290. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  291. v["isextFile"] = true
  292. j, jf, isSite = ext.PreInfo(v)
  293. } else {
  294. j, _, isSite = ext.PreInfo(v)
  295. }
  296. ext.TaskInfo.ProcessPool <- true
  297. wg.Add(1)
  298. go func(wg *sync.WaitGroup, j, jf *ju.Job) {
  299. defer wg.Done()
  300. ext.ExtractProcess(j, jf, isSite)
  301. }(&wg, j, jf)
  302. index++
  303. if index%1000 == 0 {
  304. log.Debug("index:", index, ",页码:", i+1, ",_id:", _id)
  305. }
  306. sid = _id
  307. if sid >= eid {
  308. break
  309. }
  310. }
  311. }
  312. wg.Wait()
  313. ext.BidSave(false)
  314. log.Debug("抽取完成,", "count:", count, ",index:", index, ",bidtotal:", ext.BidTotal, ",eid:", eid)
  315. }
  316. }
  317. func ExtractByUdpPre(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
  318. defer qu.Catch()
  319. if ext == nil {
  320. ext = nil
  321. ext = &ExtractTask{}
  322. ext.Id = qu.ObjToString(ju.Config["udptaskid"])
  323. ext.InitTaskInfo()
  324. ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  325. ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
  326. ext.InitSite()
  327. ext.InitRulePres()
  328. ext.InitRuleBacks(false)
  329. ext.InitRuleBacks(true)
  330. ext.InitRuleCore(false)
  331. ext.InitRuleCore(true)
  332. ext.InitBlockRule()
  333. ext.InitPkgCore()
  334. ext.InitTag(false)
  335. ext.InitTag(true)
  336. ext.InitClearFn(false)
  337. ext.InitClearFn(true)
  338. ext.Lock()
  339. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  340. ext.InitCityInfo()
  341. ext.InitAreaCode()
  342. ext.InitPostCode()
  343. }
  344. ext.Unlock()
  345. //质量审核
  346. ext.InitAuditFields()
  347. ext.InitAuditRule()
  348. ext.InitAuditClass()
  349. ext.InitAuditRecogField()
  350. //品牌抽取是否开启
  351. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  352. ext.ResultSave(true)
  353. ext.BidSave(true)
  354. ext.InitFile()
  355. ext.IsRun = true
  356. ext.BidTotal = 0
  357. } else {
  358. ext.BidTotal = 0
  359. }
  360. query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  361. count1 := ext.TaskInfo.FDB.Count("bidding_nomal", query)
  362. count2 := ext.TaskInfo.FDB.Count("bidding_file", query)
  363. log.Debug("待抽取数量:", count1+count2)
  364. list1, _ := ext.TaskInfo.FDB.Find("bidding_nomal", query, nil, Fields, false, -1, -1)
  365. list2, _ := ext.TaskInfo.FDB.Find("bidding_file", query, nil, Fields, false, -1, -1)
  366. new_list := append(*list1, *list2...)
  367. now_time := time.Now().Unix()
  368. total := 0
  369. wg_mgo := &sync.WaitGroup{}
  370. for _, v := range new_list {
  371. if total%1000 == 0 {
  372. log.Debug("cur index :", total, v["_id"])
  373. }
  374. total++
  375. if spidercode[qu.ObjToString(v["spidercode"])] { //开标记录
  376. continue
  377. }
  378. ext.TaskInfo.ProcessPool <- true
  379. wg_mgo.Add(1)
  380. go func(v map[string]interface{}) {
  381. defer func() {
  382. <-ext.TaskInfo.ProcessPool
  383. wg_mgo.Done()
  384. }()
  385. var j, jf *ju.Job
  386. var isSite bool
  387. j, _, isSite = ext.PreInfo(v)
  388. ext.ExtractProcess(j, jf, isSite)
  389. }(v)
  390. }
  391. wg_mgo.Wait()
  392. log.Debug("抽取完成:", total, ",耗时:", time.Now().Unix()-now_time)
  393. }
  394. // 中标预测信息抽取,ossid为附件识别后的id
  395. var exF *ExtractTask