mark 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. {
  2. "port": "9090",
  3. "mgodb": "127.0.0.1:27017",
  4. "dbsize": 3,
  5. "dbname": "extract_local",
  6. "dbname_addrs": "extract_local",
  7. "dbname_addrs_c": "address_new_2020",
  8. "redis": "qyk_redis=192.168.3.207:6379",
  9. "elasticsearch": "http://127.0.0.1:9800",
  10. "elasticsearch_index": "winner_enterprise_tmp",
  11. "elasticsearch_type": "winnerent",
  12. "elasticsearch_db": "winner_enterprise",
  13. "elasticsearch_buyer_index": "buyer_enterprise_tmp",
  14. "elasticsearch_buyer_type": "buyerent",
  15. "elasticsearch_buyer_db": "buyer_enterprise",
  16. "elasticsearch_agency_index": "agency_enterprise_tmp",
  17. "elasticsearch_agency_type": "agencyent",
  18. "elasticsearch_agency_db": "agency_enterprise",
  19. "redis_qyk": "qyk_redis",
  20. "redis_winner_db": "1",
  21. "redis_buyer_db": "2",
  22. "redis_agency_db": "3",
  23. "elasticPoolSize": 1,
  24. "mergetable": "projectset",
  25. "mergetablealias": "projectset_v1",
  26. "ffield": true,
  27. "saveresult": false,
  28. "fieldsfind": false,
  29. "qualityaudit": false,
  30. "saveblock": false,
  31. "filelength": 150000,
  32. "iscltlog": false,
  33. "brandgoods": false,
  34. "pricenumber":true,
  35. "udptaskid": "60b493c2e138234cb4adb640",
  36. "udpport": "1484",
  37. "nextNode": [
  38. {
  39. "addr": "127.0.0.1",
  40. "port": 1485,
  41. "memo": "抽取城市"
  42. }
  43. ],
  44. "esconfig": {
  45. "available": false,
  46. "AccessID": "",
  47. "AccessSecret": "",
  48. "ZoneIds": [
  49. {
  50. "zoneid": "cn-beijing-f",
  51. "LaunchTemplateId4": "lt-2zejb8ayql48hn0hcjpy",
  52. "LaunchTemplateId8": "lt-2zegx87hj07phcdtoh61",
  53. "vswitchid": "vsw-2zei6snkgmqxcnnx6g04d"
  54. },
  55. {
  56. "zoneid": "cn-beijing-g",
  57. "LaunchTemplateId4": "lt-2ze5ktfgopayi48ok0hu",
  58. "LaunchTemplateId8": "lt-2ze0qfrxdnkuwldj9s0u",
  59. "vswitchid": "vsw-2ze586sxfwsaov4s5w88d"
  60. },
  61. {
  62. "zoneid": "cn-beijing-h",
  63. "LaunchTemplateId4": "lt-2ze5ir54gy4ui8okr71f",
  64. "LaunchTemplateId8": "lt-2ze5fzxwgt8jcqczvmjy",
  65. "vswitchid": "vsw-2ze1n1k3mo3fv2irsfdps"
  66. }
  67. ]
  68. },
  69. "istest": true,
  70. "isSaveTag": false,
  71. "tomail": "zhengkun@topnet.net.cn",
  72. "api": "http://10.171.112.160:19281/_send/_mail",
  73. "deleteInstanceTimeHour": 1,
  74. "jsondata_extweight": 1
  75. }
  76. // extractudp
  77. package extract
  78. import (
  79. "encoding/json"
  80. "fmt"
  81. db "jy/mongodbutil"
  82. ju "jy/util"
  83. mu "mfw/util"
  84. "net"
  85. qu "qfw/util"
  86. "sync"
  87. "time"
  88. log "github.com/donnie4w/go-logger/logger"
  89. "gopkg.in/mgo.v2/bson"
  90. )
  91. var Udpclient mu.UdpClient //udp对象
  92. var nextNodes []map[string]interface{}
  93. //udp通知抽取
  94. func ExtractUdp() {
  95. nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
  96. Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
  97. Udpclient.Listen(processUdpMsg)
  98. }
  99. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  100. switch act {
  101. case mu.OP_TYPE_DATA:
  102. var rep map[string]interface{}
  103. err := json.Unmarshal(data, &rep)
  104. if err != nil {
  105. log.Debug(err)
  106. } else {
  107. stype, _ := rep["stype"].(string)
  108. if stype == "distributed" { //分布式抽取分支
  109. go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
  110. InstanceId := qu.ObjToString(rep["InstanceId"])
  111. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  112. map[string]interface{}{
  113. "$set": map[string]interface{}{
  114. "extstatus": "running",
  115. },
  116. }, true, false)
  117. ExtractByUdp("", "", ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"]))
  118. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  119. map[string]interface{}{
  120. "$set": map[string]interface{}{
  121. "extstatus": "ok",
  122. },
  123. }, true, false)
  124. log.Debug("分布式抽取完成,可以释放esc实例", qu.ObjToString(rep["ip"]))
  125. } else {
  126. sid, _ := rep["gtid"].(string)
  127. eid, _ := rep["lteid"].(string)
  128. if sid == "" || eid == "" {
  129. log.Debug("err", "sid=", sid, ",eid=", eid)
  130. } else {
  131. udpinfo, _ := rep["key"].(string)
  132. if udpinfo == "" {
  133. udpinfo = "udpok"
  134. }
  135. go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
  136. log.Debug("udp通知抽取id段", sid, " ", eid)
  137. ExtractByUdp(sid, eid, ra)
  138. for _, m := range nextNodes {
  139. by, _ := json.Marshal(map[string]interface{}{
  140. "gtid": sid,
  141. "lteid": eid,
  142. "stype": qu.ObjToString(m["stype"]),
  143. })
  144. err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  145. IP: net.ParseIP(m["addr"].(string)),
  146. Port: qu.IntAll(m["port"]),
  147. })
  148. if err != nil {
  149. log.Debug(err)
  150. }
  151. }
  152. log.Debug("udp通知抽取完成,eid=", eid)
  153. }
  154. }
  155. }
  156. case mu.OP_NOOP: //下个节点回应
  157. log.Debug(string(data))
  158. }
  159. }
  160. var ext *ExtractTask
  161. //根据id区间抽取-udp模式
  162. func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
  163. defer qu.Catch()
  164. if ext == nil {
  165. ext = &ExtractTask{}
  166. ext.Id = qu.ObjToString(ju.Config["udptaskid"])
  167. ext.InitTaskInfo()
  168. ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  169. ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
  170. ext.InitSite()
  171. ext.InitRulePres()
  172. ext.InitRuleBacks(false)
  173. ext.InitRuleBacks(true)
  174. ext.InitRuleCore(false)
  175. ext.InitRuleCore(true)
  176. ext.InitBlockRule()
  177. ext.InitPkgCore()
  178. ext.InitTag(false)
  179. ext.InitTag(true)
  180. ext.InitClearFn(false)
  181. ext.InitClearFn(true)
  182. ext.Lock()
  183. //ext.IsExtractCity = false
  184. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  185. //初始化城市DFA信息
  186. //ext.InitCityDFA()
  187. ext.InitCityInfo()
  188. ext.InitAreaCode()
  189. ext.InitPostCode()
  190. }
  191. ext.Unlock()
  192. //质量审核
  193. ext.InitAuditFields()
  194. ext.InitAuditRule()
  195. ext.InitAuditClass()
  196. ext.InitAuditRecogField()
  197. //品牌抽取是否开启
  198. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  199. ext.ResultSave(true)
  200. ext.BidSave(true)
  201. ext.IsRun = true
  202. ext.InitFile()
  203. } else {
  204. ext.BidTotal = 0
  205. }
  206. index := 0
  207. if len(instanceId) > 0 { //分布式抽取进度
  208. go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra)
  209. for {
  210. tsk, b := db.Mgo.FindOne("esctask", `{"state":{"$lt":1}}`)
  211. if tsk != nil && !b {
  212. break
  213. }
  214. db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
  215. "$set": map[string]interface{}{
  216. "InstanceId": instanceId[0],
  217. "state": 1,
  218. "runtime": time.Now().Format(qu.Date_Full_Layout),
  219. },
  220. })
  221. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(qu.ObjToString((*tsk)["sid"])), "$lte": bson.ObjectIdHex(qu.ObjToString((*tsk)["eid"]))}}
  222. count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  223. count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
  224. log.Debug("timestr", (*tsk)["timestr"], "count", count1+count2)
  225. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
  226. for _, v := range *list {
  227. //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  228. // log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
  229. // continue
  230. //}
  231. if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
  232. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  233. continue
  234. }
  235. var j, jf *ju.Job
  236. var isSite bool
  237. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  238. v["isextFile"] = true
  239. j, jf, isSite = ext.PreInfo(v)
  240. } else {
  241. j, _, isSite = ext.PreInfo(v)
  242. }
  243. go ext.ExtractProcess(j, jf, isSite)
  244. index++
  245. ext.TaskInfo.ProcessPool <- true
  246. }
  247. list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, -1, -1)
  248. for _, v := range *list2 {
  249. if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  250. continue
  251. }
  252. var j, jf *ju.Job
  253. var isSite bool
  254. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  255. v["isextFile"] = true
  256. j, jf, isSite = ext.PreInfo(v)
  257. } else {
  258. j, _, isSite = ext.PreInfo(v)
  259. }
  260. go ext.ExtractProcess(j, jf, isSite)
  261. index++
  262. ext.TaskInfo.ProcessPool <- true
  263. }
  264. db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
  265. "$set": map[string]interface{}{
  266. "InstanceId": instanceId[0],
  267. "oktime": time.Now().Format(qu.Date_Full_Layout),
  268. "state": 1,
  269. },
  270. })
  271. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  272. map[string]interface{}{
  273. "$inc": map[string]interface{}{
  274. "totalnum": count1 + count2,
  275. "step": 1,
  276. },
  277. }, true, false)
  278. }
  279. log.Debug("抽取完成", "index:", index, "bidtotal:", ext.BidTotal)
  280. } else {
  281. //普通抽取
  282. query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  283. count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  284. log.Debug("查询条件为:", query, "查询条数:", count)
  285. pageNum := (count + PageSize - 1) / PageSize
  286. limit := PageSize
  287. if count < PageSize {
  288. limit = count
  289. }
  290. wg := sync.WaitGroup{}
  291. for i := 0; i < pageNum; i++ {
  292. query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid)}}
  293. fmt.Printf("page=%d,query=%v\n", i+1, query)
  294. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, `{"_id":1}`, Fields, false, 0, limit)
  295. for _, v := range *list {
  296. //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  297. // log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
  298. // continue
  299. //}
  300. if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
  301. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
  302. continue
  303. }
  304. _id := qu.BsonIdToSId(v["_id"])
  305. var j, jf *ju.Job
  306. var isSite bool
  307. if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
  308. v["isextFile"] = true
  309. j, jf, isSite = ext.PreInfo(v)
  310. } else {
  311. j, _, isSite = ext.PreInfo(v)
  312. }
  313. ext.TaskInfo.ProcessPool <- true
  314. wg.Add(1)
  315. go func(wg *sync.WaitGroup, j, jf *ju.Job) {
  316. defer wg.Done()
  317. //log.Debug(index,j.SourceMid,)
  318. ext.ExtractProcess(j, jf, isSite)
  319. }(&wg, j, jf)
  320. index++
  321. if index%1000 == 0 {
  322. log.Debug("index:", index, ",页码:", i+1, ",_id:", _id)
  323. }
  324. sid = _id
  325. if sid >= eid {
  326. break
  327. }
  328. }
  329. }
  330. wg.Wait()
  331. ext.BidSave(false)
  332. log.Debug("抽取完成,", "count:", count, ",index:", index, ",bidtotal:", ext.BidTotal, ",eid:", eid)
  333. }
  334. }
  335. //中标预测信息抽取,ossid为附件识别后的id
  336. var exF *ExtractTask
  337. func ExtractByBidForecast(infoid string, ossid ...string) map[string]interface{} {
  338. defer qu.Catch()
  339. if exF == nil {
  340. exF = &ExtractTask{}
  341. exF.Id = qu.ObjToString(ju.Config["udptaskid"])
  342. exF.InitTaskInfo()
  343. exF.TaskInfo.FDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.FromDbAddr, exF.TaskInfo.FromDB)
  344. exF.TaskInfo.TDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.ToDbAddr, exF.TaskInfo.ToDB)
  345. exF.InitSite()
  346. exF.InitRulePres()
  347. exF.InitRuleBacks(false)
  348. exF.InitRuleBacks(true)
  349. exF.InitRuleCore(false)
  350. exF.InitRuleCore(true)
  351. exF.InitBlockRule()
  352. exF.InitPkgCore()
  353. exF.InitTag(false)
  354. exF.InitTag(true)
  355. exF.InitClearFn(false)
  356. exF.InitClearFn(true)
  357. if exF.IsExtractCity { //版本上控制是否开始城市抽取
  358. //初始化城市DFA信息
  359. //exF.InitCityDFA()
  360. exF.InitCityInfo()
  361. exF.InitAreaCode()
  362. exF.InitPostCode()
  363. }
  364. //质量审核
  365. exF.InitAuditFields()
  366. exF.InitAuditRule()
  367. exF.InitAuditClass()
  368. exF.InitAuditRecogField()
  369. //品牌抽取是否开启
  370. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  371. exF.ResultSave(true)
  372. exF.BidSave(true)
  373. exF.IsRun = true
  374. exF.InitFile()
  375. }
  376. tmp, _ := exF.TaskInfo.FDB.FindById(exF.TaskInfo.FromColl, infoid, nil)
  377. if exF.IsFileField && ((*tmp)["projectinfo"] != nil || (*tmp)["attach_text"] != nil) {
  378. (*tmp)["isextFile"] = true
  379. }
  380. exF.TaskInfo.ProcessPool <- true
  381. j, jf, _ := exF.PreInfo(*tmp)
  382. wg := sync.WaitGroup{}
  383. wg.Add(1)
  384. go func(wg *sync.WaitGroup, j, jf *ju.Job) {
  385. defer wg.Done()
  386. exF.ExtractProcess(j, jf, false)
  387. }(&wg, j, jf)
  388. wg.Wait()
  389. exF.BidSave(false)
  390. return nil
  391. }
  392. //网页版抽取
  393. {
  394. "port": "9080",
  395. "mgodb": "SJZY_RWExt_Other:SJZY%40E3X4t5O8th@172.17.145.163:27083",
  396. "dbsize": 3,
  397. "dbname": "extract_2021",
  398. "dbname_addrs": "mixdata",
  399. "dbname_addrs_c": "address_new_2020",
  400. "redis": "qyk_redis=172.17.4.87:1479",
  401. "elasticsearch": "http://172.17.145.170:9800",
  402. "elasticsearch_index": "winner_enterprise",
  403. "elasticsearch_type": "winnerent",
  404. "elasticsearch_db": "winner_enterprise",
  405. "elasticsearch_buyer_index": "buyer_enterprise",
  406. "elasticsearch_buyer_type": "buyerent",
  407. "elasticsearch_buyer_db": "buyer_enterprise",
  408. "elasticsearch_agency_index": "agency_enterprise",
  409. "elasticsearch_agency_type": "agencyent",
  410. "elasticsearch_agency_db": "agency_enterprise",
  411. "redis_qyk": "qyk_redis",
  412. "redis_winner_db": "1",
  413. "redis_buyer_db": "2",
  414. "redis_agency_db": "3",
  415. "elasticPoolSize": 10,
  416. "mergetable": "projectset",
  417. "mergetablealias": "projectset_v1",
  418. "saveresult": false,
  419. "fieldsfind": false,
  420. "qualityaudit": false,
  421. "saveblock": false,
  422. "filelength": 50000,
  423. "iscltlog": false,
  424. "brandgoods": false,
  425. "pricenumber":true,
  426. "udptaskid": "60b49fab23119b54547d9a11",
  427. "udpport": "1177",
  428. "nextNode": [ ],
  429. "esconfig": {
  430. "available": true,
  431. "AccessID": "LTAI4G5x9aoZx8dDamQ7vfZi",
  432. "AccessSecret": "Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
  433. "ZoneIds": [
  434. {
  435. "zoneid": "cn-beijing-g",
  436. "LaunchTemplateId4": "lt-2ze5ktfgopayi48ok0hu",
  437. "LaunchTemplateId8": "lt-2ze0qfrxdnkuwldj9s0u",
  438. "vswitchid": "vsw-2ze586sxfwsaov4s5w88d"
  439. },
  440. {
  441. "zoneid": "cn-beijing-h",
  442. "LaunchTemplateId4": "lt-2ze1h0akjvi4sdemm7cj",
  443. "LaunchTemplateId8": "lt-2ze5fzxwgt8jcqczvmjy",
  444. "vswitchid": "vsw-2ze1n1k3mo3fv2irsfdps"
  445. }
  446. ]
  447. },
  448. "istest": false,
  449. "isSaveTag": false,
  450. "tomail": "zhengkun@topnet.net.cn",
  451. "api": "http://172.17.145.179:19281/_send/_mail",
  452. "deleteInstanceTimeHour": 1,
  453. "jsondata_extweight": 1
  454. }