extractudp.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. // extractudp
  2. package extract
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. db "jy/mongodbutil"
  7. ju "jy/util"
  8. mu "mfw/util"
  9. "net"
  10. qu "qfw/util"
  11. "sync"
  12. "time"
  13. log "github.com/donnie4w/go-logger/logger"
  14. "gopkg.in/mgo.v2/bson"
  15. )
  16. var Udpclient mu.UdpClient //udp对象
  17. var nextNodes []map[string]interface{}
  18. //udp通知抽取
  19. func ExtractUdp() {
  20. nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
  21. Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
  22. Udpclient.Listen(processUdpMsg)
  23. }
  24. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  25. switch act {
  26. case mu.OP_TYPE_DATA:
  27. var rep map[string]interface{}
  28. err := json.Unmarshal(data, &rep)
  29. if err != nil {
  30. log.Debug(err)
  31. } else {
  32. stype, _ := rep["stype"].(string)
  33. if stype == "distributed" { //分布式抽取分支
  34. go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
  35. InstanceId := qu.ObjToString(rep["InstanceId"])
  36. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  37. map[string]interface{}{
  38. "$set": map[string]interface{}{
  39. "extstatus": "running",
  40. },
  41. }, true, false)
  42. ExtractByUdp("", "", ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"]))
  43. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  44. map[string]interface{}{
  45. "$set": map[string]interface{}{
  46. "extstatus": "ok",
  47. },
  48. }, true, false)
  49. log.Debug("分布式抽取完成,可以释放esc实例", qu.ObjToString(rep["ip"]))
  50. } else {
  51. sid, _ := rep["gtid"].(string)
  52. eid, _ := rep["lteid"].(string)
  53. if sid == "" || eid == "" {
  54. log.Debug("err", "sid=", sid, ",eid=", eid)
  55. } else {
  56. udpinfo, _ := rep["stype"].(string)
  57. if udpinfo == "" {
  58. udpinfo = "udpok"
  59. }
  60. //新版本控制抽取
  61. //ExtractByUdp(sid, eid, ra)
  62. //log.Debug("抽取完成udp通知抽取id段-控制台",udpinfo, sid, "~", eid)
  63. //Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
  64. //适配重采抽取-发送udp-必须替换
  65. go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
  66. log.Debug("udp通知抽取id段", sid, " ", eid)
  67. ExtractByUdp(sid, eid, ra)
  68. for _, m := range nextNodes {
  69. by, _ := json.Marshal(map[string]interface{}{
  70. "gtid": sid,
  71. "lteid": eid,
  72. "stype": qu.ObjToString(m["stype"]),
  73. })
  74. err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  75. IP: net.ParseIP(m["addr"].(string)),
  76. Port: qu.IntAll(m["port"]),
  77. })
  78. if err != nil {
  79. log.Debug(err)
  80. }
  81. }
  82. log.Debug("udp通知抽取完成,eid=", eid)
  83. }
  84. }
  85. }
  86. case mu.OP_NOOP: //下个节点回应
  87. log.Debug(string(data))
  88. }
  89. }
  90. var ext *ExtractTask
  91. //根据id区间抽取-udp模式
  92. func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
  93. defer qu.Catch()
  94. if ext == nil {
  95. ext = &ExtractTask{}
  96. ext.Id = qu.ObjToString(ju.Config["udptaskid"])
  97. ext.InitTaskInfo()
  98. ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  99. ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
  100. ext.InitSite()
  101. ext.InitRulePres()
  102. ext.InitRuleBacks(false)
  103. ext.InitRuleBacks(true)
  104. ext.InitRuleCore(false)
  105. ext.InitRuleCore(true)
  106. ext.InitBlockRule()
  107. ext.InitPkgCore()
  108. ext.InitTag(false)
  109. ext.InitTag(true)
  110. ext.InitClearFn(false)
  111. ext.InitClearFn(true)
  112. ext.Lock()
  113. //ext.IsExtractCity = false
  114. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  115. //初始化城市DFA信息
  116. //ext.InitCityDFA()
  117. ext.InitCityInfo()
  118. ext.InitAreaCode()
  119. ext.InitPostCode()
  120. }
  121. ext.Unlock()
  122. //质量审核
  123. ext.InitAuditFields()
  124. ext.InitAuditRule()
  125. ext.InitAuditClass()
  126. ext.InitAuditRecogField()
  127. //品牌抽取是否开启
  128. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  129. ext.ResultSave(true)
  130. ext.BidSave(true)
  131. ext.IsRun = true
  132. ext.InitFile()
  133. } else {
  134. ext.BidTotal = 0
  135. }
  136. index := 0
  137. if len(instanceId) > 0 { //分布式抽取进度
  138. go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra)
  139. for {
  140. tsk, b := db.Mgo.FindOne("esctask", `{"state":{"$lt":1}}`)
  141. if tsk != nil && !b {
  142. break
  143. }
  144. db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
  145. "$set": map[string]interface{}{
  146. "InstanceId": instanceId[0],
  147. "state": 1,
  148. "runtime": time.Now().Format(qu.Date_Full_Layout),
  149. },
  150. })
  151. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(qu.ObjToString((*tsk)["sid"])), "$lte": bson.ObjectIdHex(qu.ObjToString((*tsk)["eid"]))}}
  152. count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  153. count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
  154. log.Debug("timestr", (*tsk)["timestr"], "count", count1+count2)
  155. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
  156. for _, v := range *list {
  157. //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  158. // log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
  159. // continue
  160. //}
  161. if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
  162. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  163. continue
  164. }
  165. //if qu.ObjToString(v["subtype"])!="中标" &&
  166. // qu.ObjToString(v["subtype"])!="成交" &&
  167. // qu.ObjToString(v["subtype"])!="合同" {
  168. // continue
  169. //}
  170. var j, jf *ju.Job
  171. var isSite bool
  172. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  173. v["isextFile"] = true
  174. j, jf, isSite = ext.PreInfo(v)
  175. } else {
  176. j, _, isSite = ext.PreInfo(v)
  177. }
  178. go ext.ExtractProcess(j, jf, isSite)
  179. index++
  180. ext.TaskInfo.ProcessPool <- true
  181. }
  182. list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, -1, -1)
  183. for _, v := range *list2 {
  184. //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  185. // continue
  186. //}
  187. if spidercode[qu.ObjToString(v["spidercode"])] {
  188. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  189. continue
  190. }
  191. //if qu.ObjToString(v["subtype"])!="中标" &&
  192. // qu.ObjToString(v["subtype"])!="成交" &&
  193. // qu.ObjToString(v["subtype"])!="合同" {
  194. // continue
  195. //}
  196. var j, jf *ju.Job
  197. var isSite bool
  198. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  199. v["isextFile"] = true
  200. j, jf, isSite = ext.PreInfo(v)
  201. } else {
  202. j, _, isSite = ext.PreInfo(v)
  203. }
  204. go ext.ExtractProcess(j, jf, isSite)
  205. index++
  206. ext.TaskInfo.ProcessPool <- true
  207. }
  208. db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
  209. "$set": map[string]interface{}{
  210. "InstanceId": instanceId[0],
  211. "oktime": time.Now().Format(qu.Date_Full_Layout),
  212. "state": 1,
  213. },
  214. })
  215. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  216. map[string]interface{}{
  217. "$inc": map[string]interface{}{
  218. "totalnum": count1 + count2,
  219. "step": 1,
  220. },
  221. }, true, false)
  222. }
  223. log.Debug("抽取完成", "index:", index, "bidtotal:", ext.BidTotal)
  224. } else {
  225. //普通抽取
  226. query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  227. count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  228. log.Debug("查询条件为:", query, "查询条数:", count)
  229. pageNum := (count + PageSize - 1) / PageSize
  230. limit := PageSize
  231. if count < PageSize {
  232. limit = count
  233. }
  234. wg := sync.WaitGroup{}
  235. for i := 0; i < pageNum; i++ {
  236. query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid)}}
  237. fmt.Printf("page=%d,query=%v\n", i+1, query)
  238. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, `{"_id":1}`, Fields, false, 0, limit)
  239. for _, v := range *list {
  240. //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  241. // log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
  242. // continue
  243. //}
  244. if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
  245. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  246. continue
  247. }
  248. _id := qu.BsonIdToSId(v["_id"])
  249. var j, jf *ju.Job
  250. var isSite bool
  251. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  252. v["isextFile"] = true
  253. j, jf, isSite = ext.PreInfo(v)
  254. } else {
  255. j, _, isSite = ext.PreInfo(v)
  256. }
  257. ext.TaskInfo.ProcessPool <- true
  258. wg.Add(1)
  259. go func(wg *sync.WaitGroup, j, jf *ju.Job) {
  260. defer wg.Done()
  261. //log.Debug(index,j.SourceMid,)
  262. ext.ExtractProcess(j, jf, isSite)
  263. }(&wg, j, jf)
  264. index++
  265. if index%1000 == 0 {
  266. log.Debug("index:", index, ",页码:", i+1, ",_id:", _id)
  267. }
  268. sid = _id
  269. if sid >= eid {
  270. break
  271. }
  272. }
  273. }
  274. wg.Wait()
  275. ext.BidSave(false)
  276. log.Debug("抽取完成,", "count:", count, ",index:", index, ",bidtotal:", ext.BidTotal, ",eid:", eid)
  277. }
  278. }
  279. //中标预测信息抽取,ossid为附件识别后的id
  280. var exF *ExtractTask
  281. func ExtractByBidForecast(infoid string, ossid ...string) map[string]interface{} {
  282. defer qu.Catch()
  283. if exF == nil {
  284. exF = &ExtractTask{}
  285. exF.Id = qu.ObjToString(ju.Config["udptaskid"])
  286. exF.InitTaskInfo()
  287. exF.TaskInfo.FDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.FromDbAddr, exF.TaskInfo.FromDB)
  288. exF.TaskInfo.TDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.ToDbAddr, exF.TaskInfo.ToDB)
  289. exF.InitSite()
  290. exF.InitRulePres()
  291. exF.InitRuleBacks(false)
  292. exF.InitRuleBacks(true)
  293. exF.InitRuleCore(false)
  294. exF.InitRuleCore(true)
  295. exF.InitBlockRule()
  296. exF.InitPkgCore()
  297. exF.InitTag(false)
  298. exF.InitTag(true)
  299. exF.InitClearFn(false)
  300. exF.InitClearFn(true)
  301. if exF.IsExtractCity { //版本上控制是否开始城市抽取
  302. //初始化城市DFA信息
  303. //exF.InitCityDFA()
  304. exF.InitCityInfo()
  305. exF.InitAreaCode()
  306. exF.InitPostCode()
  307. }
  308. //质量审核
  309. exF.InitAuditFields()
  310. exF.InitAuditRule()
  311. exF.InitAuditClass()
  312. exF.InitAuditRecogField()
  313. //品牌抽取是否开启
  314. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  315. exF.ResultSave(true)
  316. exF.BidSave(true)
  317. exF.IsRun = true
  318. exF.InitFile()
  319. }
  320. tmp, _ := exF.TaskInfo.FDB.FindById(exF.TaskInfo.FromColl, infoid, nil)
  321. if exF.IsFileField && ((*tmp)["projectinfo"] != nil || (*tmp)["attach_text"] != nil) {
  322. (*tmp)["isextFile"] = true
  323. }
  324. exF.TaskInfo.ProcessPool <- true
  325. j, jf, _ := exF.PreInfo(*tmp)
  326. wg := sync.WaitGroup{}
  327. wg.Add(1)
  328. go func(wg *sync.WaitGroup, j, jf *ju.Job) {
  329. defer wg.Done()
  330. exF.ExtractProcess(j, jf, false)
  331. }(&wg, j, jf)
  332. wg.Wait()
  333. exF.BidSave(false)
  334. return nil
  335. }