extractudp.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. // extractudp
  2. package extract
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. db "jy/mongodbutil"
  7. ju "jy/util"
  8. mu "mfw/util"
  9. "net"
  10. qu "qfw/util"
  11. "sync"
  12. "time"
  13. log "github.com/donnie4w/go-logger/logger"
  14. "gopkg.in/mgo.v2/bson"
  15. )
  16. var Udpclient mu.UdpClient //udp对象
  17. var nextNodes []map[string]interface{}
  18. var IsExtStop bool
  19. //新增机器节点
  20. func ExtractUdpUpdateMachine() {
  21. machine := *qu.ObjToMap(ju.Config["udpmachine"])
  22. if len(machine) == 0 || machine == nil {
  23. return
  24. }
  25. skey := fmt.Sprintf("%s:%d:%s", machine["addr"], qu.IntAll(machine["port"]), machine["stype"])
  26. machine["skey"] = skey
  27. db.Mgo.Update("extract_control_center", map[string]interface{}{"skey": skey}, machine, true, false)
  28. }
  29. //udp通知抽取
  30. func ExtractUdp() {
  31. nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
  32. Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
  33. Udpclient.Listen(processUdpMsg)
  34. }
  35. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  36. switch act {
  37. case mu.OP_TYPE_DATA:
  38. var rep map[string]interface{}
  39. err := json.Unmarshal(data, &rep)
  40. if err != nil {
  41. log.Debug(err)
  42. } else {
  43. stype, _ := rep["stype"].(string)
  44. if stype == "distributed" { //分布式抽取分支
  45. go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
  46. InstanceId := qu.ObjToString(rep["InstanceId"])
  47. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  48. map[string]interface{}{
  49. "$set": map[string]interface{}{
  50. "extstatus": "running",
  51. },
  52. }, true, false)
  53. ExtractByUdp("", "", ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"]))
  54. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  55. map[string]interface{}{
  56. "$set": map[string]interface{}{
  57. "extstatus": "ok",
  58. },
  59. }, true, false)
  60. log.Debug("分布式抽取完成,可以释放esc实例", qu.ObjToString(rep["ip"]))
  61. } else if stype == "stop_extract" {
  62. IsExtStop = true
  63. } else if stype == "heart_extract" {
  64. skey, _ := rep["skey"].(string)
  65. Udpclient.WriteUdp([]byte(skey), mu.OP_NOOP, ra)
  66. } else {
  67. sid, _ := rep["gtid"].(string)
  68. eid, _ := rep["lteid"].(string)
  69. if sid == "" || eid == "" {
  70. log.Debug("err", "sid=", sid, ",eid=", eid)
  71. } else {
  72. udpinfo, _ := rep["stype"].(string)
  73. if udpinfo == "" {
  74. udpinfo = "udpok"
  75. }
  76. IsExtStop = false
  77. //新版本控制抽取
  78. ExtractByUdp(sid, eid, ra)
  79. if !IsExtStop {
  80. log.Debug("抽取完成udp通知抽取id段-控制台", udpinfo, sid, "~", eid)
  81. Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
  82. } else {
  83. log.Debug("抽取强制中断udp不通知-控制台", udpinfo, sid, "~", eid)
  84. }
  85. //适配重采抽取-发送udp-必须替换
  86. //go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
  87. //发布数据~测试流程
  88. //key := sid + "-" + eid + "-" + qu.ObjToString(rep["stype"])
  89. //go Udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  90. //
  91. //log.Debug("udp通知抽取id段", sid, " ", eid)
  92. //ExtractByUdp(sid, eid, ra)
  93. //for _, m := range nextNodes {
  94. // by, _ := json.Marshal(map[string]interface{}{
  95. // "gtid": sid,
  96. // "lteid": eid,
  97. // "stype": qu.ObjToString(m["stype"]),
  98. // })
  99. // err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  100. // IP: net.ParseIP(m["addr"].(string)),
  101. // Port: qu.IntAll(m["port"]),
  102. // })
  103. // if err != nil {
  104. // log.Debug(err)
  105. // }
  106. //}
  107. //log.Debug("udp通知抽取完成,eid=", eid)
  108. }
  109. }
  110. }
  111. case mu.OP_NOOP: //下个节点回应
  112. log.Debug(string(data))
  113. }
  114. }
  115. var ext *ExtractTask
  116. //根据id区间抽取-udp模式
  117. func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
  118. defer qu.Catch()
  119. if ext == nil {
  120. ext = &ExtractTask{}
  121. ext.Id = qu.ObjToString(ju.Config["udptaskid"])
  122. ext.InitTaskInfo()
  123. ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  124. ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
  125. ext.InitSite()
  126. ext.InitRulePres()
  127. ext.InitRuleBacks(false)
  128. ext.InitRuleBacks(true)
  129. ext.InitRuleCore(false)
  130. ext.InitRuleCore(true)
  131. ext.InitBlockRule()
  132. ext.InitPkgCore()
  133. ext.InitTag(false)
  134. ext.InitTag(true)
  135. ext.InitClearFn(false)
  136. ext.InitClearFn(true)
  137. ext.Lock()
  138. //ext.IsExtractCity = false
  139. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  140. //初始化城市DFA信息
  141. //ext.InitCityDFA()
  142. ext.InitCityInfo()
  143. ext.InitAreaCode()
  144. ext.InitPostCode()
  145. }
  146. ext.Unlock()
  147. //质量审核
  148. ext.InitAuditFields()
  149. ext.InitAuditRule()
  150. ext.InitAuditClass()
  151. ext.InitAuditRecogField()
  152. //品牌抽取是否开启
  153. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  154. ext.ResultSave(true)
  155. ext.BidSave(true)
  156. ext.IsRun = true
  157. ext.InitFile()
  158. } else {
  159. ext.BidTotal = 0 //是否更新站点数据~~~
  160. if ju.IsUpdateSite && ext.IsExtractCity {
  161. ext.InitUpdateSite()
  162. ju.IsUpdateSite = false
  163. }
  164. //更新规则~标签~~
  165. if ju.IsUpdateRuleTag {
  166. ju.IsUpdateRuleTag = false
  167. ext.InitRulePres()
  168. ext.InitRuleBacks(false)
  169. ext.InitRuleBacks(true)
  170. ext.InitRuleCore(false)
  171. ext.InitRuleCore(true)
  172. ext.InitBlockRule()
  173. ext.InitPkgCore()
  174. ext.InitTag(false)
  175. ext.InitTag(true)
  176. ext.InitClearFn(false)
  177. ext.InitClearFn(true)
  178. ext.Lock()
  179. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  180. //初始化城市DFA信息
  181. //ext.InitCityDFA()
  182. ext.InitCityInfo()
  183. ext.InitAreaCode()
  184. ext.InitPostCode()
  185. }
  186. ext.Unlock()
  187. }
  188. }
  189. index := 0
  190. if len(instanceId) > 0 { //分布式抽取进度
  191. go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra)
  192. for {
  193. tsk, b := db.Mgo.FindOne("esctask", `{"state":{"$lt":1}}`)
  194. if tsk != nil && !b {
  195. break
  196. }
  197. db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
  198. "$set": map[string]interface{}{
  199. "InstanceId": instanceId[0],
  200. "state": 1,
  201. "runtime": time.Now().Format(qu.Date_Full_Layout),
  202. },
  203. })
  204. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(qu.ObjToString((*tsk)["sid"])), "$lte": bson.ObjectIdHex(qu.ObjToString((*tsk)["eid"]))}}
  205. count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  206. count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
  207. log.Debug("timestr", (*tsk)["timestr"], "count", count1+count2)
  208. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
  209. for _, v := range *list {
  210. //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  211. // log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
  212. // continue
  213. //}
  214. if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
  215. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  216. continue
  217. }
  218. //if qu.ObjToString(v["subtype"])!="中标" &&
  219. // qu.ObjToString(v["subtype"])!="成交" &&
  220. // qu.ObjToString(v["subtype"])!="合同" {
  221. // continue
  222. //}
  223. var j, jf *ju.Job
  224. var isSite bool
  225. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  226. v["isextFile"] = true
  227. j, jf, isSite = ext.PreInfo(v)
  228. } else {
  229. j, _, isSite = ext.PreInfo(v)
  230. }
  231. go ext.ExtractProcess(j, jf, isSite)
  232. index++
  233. ext.TaskInfo.ProcessPool <- true
  234. }
  235. list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, -1, -1)
  236. for _, v := range *list2 {
  237. //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  238. // continue
  239. //}
  240. if spidercode[qu.ObjToString(v["spidercode"])] {
  241. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  242. continue
  243. }
  244. //if qu.ObjToString(v["subtype"])!="中标" &&
  245. // qu.ObjToString(v["subtype"])!="成交" &&
  246. // qu.ObjToString(v["subtype"])!="合同" {
  247. // continue
  248. //}
  249. var j, jf *ju.Job
  250. var isSite bool
  251. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  252. v["isextFile"] = true
  253. j, jf, isSite = ext.PreInfo(v)
  254. } else {
  255. j, _, isSite = ext.PreInfo(v)
  256. }
  257. go ext.ExtractProcess(j, jf, isSite)
  258. index++
  259. ext.TaskInfo.ProcessPool <- true
  260. }
  261. db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
  262. "$set": map[string]interface{}{
  263. "InstanceId": instanceId[0],
  264. "oktime": time.Now().Format(qu.Date_Full_Layout),
  265. "state": 1,
  266. },
  267. })
  268. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  269. map[string]interface{}{
  270. "$inc": map[string]interface{}{
  271. "totalnum": count1 + count2,
  272. "step": 1,
  273. },
  274. }, true, false)
  275. }
  276. log.Debug("抽取完成", "index:", index, "bidtotal:", ext.BidTotal)
  277. } else {
  278. //普通抽取
  279. query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  280. count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  281. log.Debug("查询条件为:", query, "查询条数:", count)
  282. pageNum := (count + PageSize - 1) / PageSize
  283. limit := PageSize
  284. if count < PageSize {
  285. limit = count
  286. }
  287. wg := sync.WaitGroup{}
  288. for i := 0; i < pageNum; i++ {
  289. query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid)}}
  290. fmt.Printf("page=%d,query=%v\n", i+1, query)
  291. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, `{"_id":1}`, Fields, false, 0, limit)
  292. for _, v := range *list {
  293. if IsExtStop {
  294. break
  295. }
  296. if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
  297. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  298. continue
  299. }
  300. _id := qu.BsonIdToSId(v["_id"])
  301. var j, jf *ju.Job
  302. var isSite bool
  303. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  304. v["isextFile"] = true
  305. j, jf, isSite = ext.PreInfo(v)
  306. } else {
  307. j, _, isSite = ext.PreInfo(v)
  308. }
  309. ext.TaskInfo.ProcessPool <- true
  310. wg.Add(1)
  311. go func(wg *sync.WaitGroup, j, jf *ju.Job) {
  312. defer wg.Done()
  313. //log.Debug(index,j.SourceMid,)
  314. ext.ExtractProcess(j, jf, isSite)
  315. }(&wg, j, jf)
  316. index++
  317. if index%1000 == 0 {
  318. log.Debug("index:", index, ",页码:", i+1, ",_id:", _id)
  319. }
  320. sid = _id
  321. if sid >= eid {
  322. break
  323. }
  324. }
  325. }
  326. wg.Wait()
  327. ext.BidSave(false)
  328. log.Debug("抽取完成,", "count:", count, ",index:", index, ",bidtotal:", ext.BidTotal, ",eid:", eid)
  329. }
  330. }
  331. //中标预测信息抽取,ossid为附件识别后的id
  332. var exF *ExtractTask
  333. func ExtractByBidForecast(infoid string, ossid ...string) map[string]interface{} {
  334. defer qu.Catch()
  335. if exF == nil {
  336. exF = &ExtractTask{}
  337. exF.Id = qu.ObjToString(ju.Config["udptaskid"])
  338. exF.InitTaskInfo()
  339. exF.TaskInfo.FDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.FromDbAddr, exF.TaskInfo.FromDB)
  340. exF.TaskInfo.TDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.ToDbAddr, exF.TaskInfo.ToDB)
  341. exF.InitSite()
  342. exF.InitRulePres()
  343. exF.InitRuleBacks(false)
  344. exF.InitRuleBacks(true)
  345. exF.InitRuleCore(false)
  346. exF.InitRuleCore(true)
  347. exF.InitBlockRule()
  348. exF.InitPkgCore()
  349. exF.InitTag(false)
  350. exF.InitTag(true)
  351. exF.InitClearFn(false)
  352. exF.InitClearFn(true)
  353. if exF.IsExtractCity { //版本上控制是否开始城市抽取
  354. //初始化城市DFA信息
  355. //exF.InitCityDFA()
  356. exF.InitCityInfo()
  357. exF.InitAreaCode()
  358. exF.InitPostCode()
  359. }
  360. //质量审核
  361. exF.InitAuditFields()
  362. exF.InitAuditRule()
  363. exF.InitAuditClass()
  364. exF.InitAuditRecogField()
  365. //品牌抽取是否开启
  366. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  367. exF.ResultSave(true)
  368. exF.BidSave(true)
  369. exF.IsRun = true
  370. exF.InitFile()
  371. }
  372. tmp, _ := exF.TaskInfo.FDB.FindById(exF.TaskInfo.FromColl, infoid, nil)
  373. if exF.IsFileField && ((*tmp)["projectinfo"] != nil || (*tmp)["attach_text"] != nil) {
  374. (*tmp)["isextFile"] = true
  375. }
  376. exF.TaskInfo.ProcessPool <- true
  377. j, jf, _ := exF.PreInfo(*tmp)
  378. wg := sync.WaitGroup{}
  379. wg.Add(1)
  380. go func(wg *sync.WaitGroup, j, jf *ju.Job) {
  381. defer wg.Done()
  382. exF.ExtractProcess(j, jf, false)
  383. }(&wg, j, jf)
  384. wg.Wait()
  385. exF.BidSave(false)
  386. return nil
  387. }