task.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "app.yhyue.com/data_processing/common_utils/udp"
  8. "encoding/json"
  9. "field_sync/config"
  10. "field_sync/oss"
  11. "fmt"
  12. "go.uber.org/zap"
  13. "net"
  14. "reflect"
  15. "regexp"
  16. "strings"
  17. )
  18. var (
  19. regLetter = regexp.MustCompile("[a-z]*")
  20. )
  21. func biddingTask(data []byte, mapInfo map[string]interface{}) {
  22. defer util.Catch()
  23. stype := util.ObjToString(mapInfo["stype"])
  24. // 领域标签处理的数据 id段
  25. if stype == "bidding_history" {
  26. MgoB.Save("field_data_record", map[string]interface{}{"gtid": mapInfo["gtid"], "lteid": mapInfo["lteid"], "status": 0})
  27. }
  28. q, _ := mapInfo["query"].(map[string]interface{})
  29. bkey, _ := mapInfo["bkey"].(string)
  30. if q == nil {
  31. q = map[string]interface{}{
  32. "_id": map[string]interface{}{
  33. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  34. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  35. },
  36. }
  37. }
  38. //extract库
  39. extractConn := MgoE.GetMgoConn()
  40. defer MgoE.DestoryMongoConn(extractConn)
  41. extractResult := extractConn.DB(MgoE.DbName).C(config.Conf.DB.MongoE.Coll).Find(q).Select(map[string]interface{}{
  42. "field_source": 0,
  43. "kvtext": 0,
  44. }).Sort("_id").Iter()
  45. eMap := map[string]map[string]interface{}{}
  46. extCount, repeatCount := 0, 0
  47. for tmp := make(map[string]interface{}); extractResult.Next(tmp); extCount++ {
  48. if util.IntAll(tmp["repeat"]) == 1 {
  49. repeatCount++
  50. }
  51. tid := mongodb.BsonIdToSId(tmp["_id"])
  52. eMap[tid] = tmp
  53. tmp = make(map[string]interface{})
  54. }
  55. log.Info("抽取表", zap.Int("数据量", extCount), zap.Int("重复数据量", repeatCount))
  56. //bidding库
  57. biddingConn := MgoB.GetMgoConn()
  58. count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count()
  59. log.Info("bidding表", zap.Int64("同步总数:", count))
  60. c := 0
  61. if count < 200000 {
  62. var res []map[string]interface{}
  63. result := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(map[string]interface{}{
  64. "contenthtml": 0,
  65. }).Iter()
  66. for tmp := make(map[string]interface{}); result.Next(tmp); {
  67. res = append(res, tmp)
  68. tmp = make(map[string]interface{})
  69. }
  70. MgoB.DestoryMongoConn(biddingConn)
  71. log.Info("查询结果", zap.Int64("bidding", count), zap.Int("抽取:", extCount))
  72. c = doIndex(res, eMap, bkey, stype)
  73. } else {
  74. log.Info("查询结果", zap.Int64("数据量太大,放弃", count))
  75. MgoB.DestoryMongoConn(biddingConn)
  76. }
  77. log.Info("bidding sync...over", zap.Int64("all", count), zap.Int("extract sync", c))
  78. NextNode(mapInfo, stype)
  79. NextNodePro(mapInfo, stype)
  80. if stype == "bidding_history" {
  81. NextNodeBidData(mapInfo) // bidding-data数据
  82. NextNodeTidb(mapInfo) // tidb-企业数据
  83. }
  84. }
  85. func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, bkey, stype string) int {
  86. syncNo := 0 //抽取表数据同步数量
  87. //对比两张表数据,减少查询次数
  88. var compare map[string]interface{}
  89. var bidUpdate [][]map[string]interface{}
  90. var extUpdate [][]map[string]interface{}
  91. //SaveEsLock := &sync.Mutex{}
  92. log.Info("start ...")
  93. for n, tmp := range infos {
  94. tid := mongodb.BsonIdToSId(tmp["_id"])
  95. update := map[string]interface{}{} //要更新的mongo数据
  96. //对比方法----------------
  97. if eMap[tid] != nil {
  98. compare = eMap[tid]
  99. if stype == "bidding" {
  100. // 增量id段 正常数据
  101. if dg := util.IntAll(compare["dataging"]); dg == 1 { //extract中dataging=1跳过
  102. tmp = make(map[string]interface{})
  103. compare = nil
  104. continue
  105. }
  106. delete(eMap, tid)
  107. }
  108. if stype == "bidding_history" {
  109. //增量id段 历史数据
  110. if compare["history_updatetime"] == nil { //extract中history_updatetime不存在跳过
  111. tmp = make(map[string]interface{})
  112. compare = nil
  113. continue
  114. }
  115. delete(eMap, tid)
  116. }
  117. syncNo++
  118. for _, k := range config.Conf.Serve.FieldS {
  119. v1 := compare[k] //extract
  120. v2 := tmp[k] //bidding
  121. if v2 == nil && v1 != nil {
  122. update[k] = v1
  123. } else if v2 != nil && v1 != nil {
  124. update[k] = v1
  125. } else if v2 != nil && v1 == nil {
  126. if k == "city" || k == "district" {
  127. update[k] = ""
  128. }
  129. }
  130. }
  131. if util.IntAll(compare["repeat"]) == 1 {
  132. update["extracttype"] = -1
  133. update["dataprocess"] = 7
  134. } else {
  135. update["extracttype"] = 1
  136. update["dataprocess"] = 8
  137. }
  138. } else {
  139. compare = nil
  140. if util.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging
  141. update["dataging"] = 0
  142. }
  143. update["dataprocess"] = 8
  144. }
  145. //下面可以多线程跑的--->
  146. //处理分类
  147. if compare != nil { //extract
  148. fieldFun(compare, update)
  149. compare = nil
  150. }
  151. //------------------对比结束
  152. //处理key descript
  153. if bkey == "" {
  154. DealInfo(&tmp, &update)
  155. }
  156. // entidlist
  157. extractMap := make(map[string]interface{})
  158. if update["s_winner"] != "" {
  159. cid := companyFun(update)
  160. if len(cid) > 0 {
  161. tmp["entidlist"] = cid
  162. update["entidlist"] = cid
  163. extractMap["entidlist"] = cid
  164. }
  165. }
  166. // 6.10 剑鱼发布信息分类处理, 写在这里是为了修改抽取表
  167. typeFunc(tmp, update, extractMap)
  168. if len(extractMap) > 0 {
  169. if extractMap["toptype"] != nil && extractMap["subtype"] == nil {
  170. //updateExtPool <- []map[string]interface{}{
  171. // {"_id": tmp["_id"]},
  172. // {"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}},
  173. //}
  174. extUpdate = append(extUpdate, []map[string]interface{}{
  175. {"_id": tmp["_id"]},
  176. {"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}},
  177. })
  178. } else {
  179. //updateExtPool <- []map[string]interface{}{
  180. // {"_id": tmp["_id"]},
  181. // {"$set": extractMap},
  182. //}
  183. extUpdate = append(extUpdate, []map[string]interface{}{
  184. {"_id": tmp["_id"]},
  185. {"$set": extractMap},
  186. })
  187. }
  188. if len(extUpdate) >= MgoBulkSize {
  189. tmps := extUpdate
  190. MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...)
  191. extUpdate = [][]map[string]interface{}{}
  192. }
  193. }
  194. // 附件有效字段
  195. if i := validFile(tmp); i != 0 {
  196. if i == -1 {
  197. tmp["isValidFile"] = false
  198. update["isValidFile"] = false
  199. } else {
  200. tmp["isValidFile"] = true
  201. update["isValidFile"] = true
  202. }
  203. }
  204. if len(update) > 0 {
  205. //SaveEsLock.Lock()
  206. bidUpdate = append(bidUpdate, []map[string]interface{}{{
  207. "_id": tmp["_id"],
  208. },
  209. {"$set": update},
  210. })
  211. if len(bidUpdate) >= MgoBulkSize {
  212. tmps := bidUpdate
  213. MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...)
  214. bidUpdate = [][]map[string]interface{}{}
  215. }
  216. //SaveEsLock.Unlock()
  217. //updateBidPool <- []map[string]interface{}{{
  218. // "_id": tmp["_id"],
  219. //},
  220. // {"$set": update},
  221. //}
  222. }
  223. if n%500 == 0 {
  224. log.Info("biddingTask", zap.Int("current", n))
  225. }
  226. tmp = make(map[string]interface{})
  227. }
  228. //SaveEsLock.Lock()
  229. if len(bidUpdate) > 0 {
  230. tmps := bidUpdate
  231. MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...)
  232. bidUpdate = [][]map[string]interface{}{}
  233. }
  234. if len(extUpdate) > 0 {
  235. tmps := extUpdate
  236. MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...)
  237. extUpdate = [][]map[string]interface{}{}
  238. }
  239. //SaveEsLock.Unlock()
  240. return syncNo
  241. }
  242. // @Description subscopeclass、topscopeclass、package
  243. // @Author J 2022/6/7 5:54 PM
  244. func fieldFun(compare, update map[string]interface{}) {
  245. subscopeclass, _ := compare["subscopeclass"].([]interface{}) //subscopeclass
  246. if subscopeclass != nil {
  247. m1 := map[string]bool{}
  248. newclass := []string{}
  249. for _, sc := range subscopeclass {
  250. sclass, _ := sc.(string)
  251. if !m1[sclass] {
  252. m1[sclass] = true
  253. newclass = append(newclass, sclass)
  254. }
  255. }
  256. update["s_subscopeclass"] = strings.Join(newclass, ",")
  257. update["subscopeclass"] = newclass
  258. }
  259. topscopeclass, _ := compare["topscopeclass"].([]interface{}) //topscopeclass
  260. if topscopeclass != nil {
  261. m2 := map[string]bool{}
  262. newclass := []string{}
  263. for _, tc := range topscopeclass {
  264. tclass, _ := tc.(string)
  265. tclass = regLetter.ReplaceAllString(tclass, "") // 去除字母
  266. if !m2[tclass] {
  267. m2[tclass] = true
  268. newclass = append(newclass, tclass)
  269. }
  270. }
  271. update["topscopeclass"] = topscopeclass
  272. update["s_topscopeclass"] = strings.Join(newclass, ",")
  273. }
  274. if package1 := compare["package"]; package1 != nil {
  275. packageM, _ := package1.(map[string]interface{})
  276. update["package"] = packageM
  277. for _, p := range packageM {
  278. pm, _ := p.(map[string]interface{})
  279. if util.ObjToString(pm["winner"]) != "" || util.Float64All(pm["budget"]) > 0 ||
  280. util.Float64All(pm["bidamount"]) > 0 {
  281. update["multipackage"] = 1
  282. break
  283. }
  284. }
  285. } else {
  286. update["multipackage"] = 0
  287. }
  288. }
  289. // @Description entidlist
  290. // @Author J 2022/6/7 2:36 PM
  291. func companyFun(tmp map[string]interface{}) (cid []string) {
  292. sWinnerarr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  293. for _, w := range sWinnerarr {
  294. if w != "" {
  295. id := redis.GetStr("qyxy_id", w)
  296. if id == "" {
  297. ents, _ := MgoQ.Find(config.Conf.DB.MongoQ.Coll, map[string]interface{}{"company_name": w}, map[string]interface{}{"updatetime": -1}, map[string]interface{}{"company_name": 1}, false, -1, -1)
  298. if len(*ents) > 0 {
  299. id = util.ObjToString((*ents)[0]["_id"])
  300. redis.PutCKV("qyxy_id", w, id)
  301. } else {
  302. ent, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, map[string]interface{}{"history_name": w})
  303. if len(*ent) > 0 {
  304. id = util.ObjToString((*ent)["company_id"])
  305. redis.PutCKV("qyxy_id", w, id)
  306. }
  307. }
  308. }
  309. if id == "" {
  310. id = "-"
  311. }
  312. cid = append(cid, id)
  313. }
  314. }
  315. return cid
  316. }
  317. // @Description update 修改bidding表,extractM修改抽取表
  318. // @Author J 2022/6/10 10:29 AM
  319. func typeFunc(tmp, update, extractM map[string]interface{}) {
  320. if jyData, ok := tmp["jyfb_data"].(map[string]interface{}); ok {
  321. if t := util.ObjToString(jyData["type"]); t != "" {
  322. switch t {
  323. //case "采购信息":
  324. case "招标公告":
  325. if util.ObjToString(tmp["toptype"]) != "招标" {
  326. update["toptype"] = "招标"
  327. extractM["toptype"] = "招标"
  328. delete(update, "subtype")
  329. }
  330. case "采购意向":
  331. if util.ObjToString(tmp["toptype"]) != "采购意向" {
  332. update["toptype"] = "采购意向"
  333. update["subtype"] = "采购意向"
  334. extractM["toptype"] = "采购意向"
  335. extractM["subtype"] = "采购意向"
  336. }
  337. case "招标预告":
  338. if util.ObjToString(tmp["toptype"]) != "预告" {
  339. update["toptype"] = "预告"
  340. extractM["toptype"] = "预告"
  341. delete(update, "subtype")
  342. }
  343. case "招标结果":
  344. if util.ObjToString(tmp["toptype"]) != "结果" {
  345. update["toptype"] = "结果"
  346. extractM["toptype"] = "结果"
  347. delete(update, "subtype")
  348. }
  349. }
  350. }
  351. }
  352. }
  353. // @Description 附件有效字段(isValidFile)
  354. // @Author J 2022/7/8 14:41
  355. func validFile(tmp map[string]interface{}) int {
  356. isContinue := false
  357. if pinfo, o := tmp["projectinfo"].(map[string]interface{}); o {
  358. if atts, o1 := pinfo["attachments"].(map[string]interface{}); o1 {
  359. for _, att := range atts {
  360. if att == nil {
  361. continue
  362. }
  363. if reflect.TypeOf(att).String() == "string" {
  364. continue
  365. }
  366. att1 := att.(map[string]interface{})
  367. if fid := util.ObjToString(att1["fid"]); fid != "" {
  368. isContinue = true
  369. break
  370. }
  371. }
  372. if isContinue {
  373. if attachTxt, o := tmp["attach_text"].(map[string]interface{}); o {
  374. if len(attachTxt) > 0 {
  375. for _, at := range attachTxt {
  376. at1 := at.(map[string]interface{})
  377. if len(at1) > 0 {
  378. for k, _ := range at1 {
  379. if reflect.TypeOf(at1[k]).String() == "string" {
  380. continue
  381. }
  382. at2 := at1[k].(map[string]interface{})
  383. s := strings.ToLower(util.ObjToString(at2["file_name"]))
  384. if !strings.Contains(s, "jpg") || !strings.Contains(s, "jpeg") != strings.Contains(s, "png") ||
  385. strings.Contains(s, "pdf") {
  386. if strings.Contains(s, "swf") || strings.Contains(s, "html") {
  387. return -1
  388. } else if AnalysisFile(oss.OssGetObject(util.ObjToString(at2["attach_url"]))) {
  389. return 1
  390. }
  391. }
  392. }
  393. break
  394. } else {
  395. break
  396. }
  397. }
  398. }
  399. }
  400. flag := false
  401. for _, att := range atts {
  402. if att == nil {
  403. continue
  404. }
  405. if reflect.TypeOf(att).String() == "string" {
  406. continue
  407. }
  408. att1 := att.(map[string]interface{})
  409. if fid := util.ObjToString(att1["fid"]); fid != "" {
  410. ftype := strings.ToLower(util.ObjToString(tmp["ftype"]))
  411. if ftype != "swf" && ftype != "html" && oss.OssObjExists("jy-datafile", fid) {
  412. return 1
  413. } else {
  414. flag = true
  415. }
  416. }
  417. }
  418. if flag {
  419. return -1
  420. }
  421. }
  422. }
  423. }
  424. return 0
  425. }
  426. // @Description id不变,内容变化 重新索引数据
  427. // @Author J 2022/8/10 13:29
  428. func taskinfo(id string) {
  429. tmp, _ := MgoB.FindById("bidding", id, nil)
  430. if tmp == nil || len(*tmp) == 0 {
  431. log.Info(fmt.Sprintf("taskinfo bidding id=%s 未查询到数据", id))
  432. return
  433. }
  434. extractM, _ := MgoE.FindById(config.Conf.DB.MongoE.Coll, id, nil)
  435. if extractM == nil || len(*extractM) == 0 {
  436. extractM, _ = MgoE.FindById(config.Conf.DB.MongoE.Coll1, id, nil)
  437. if extractM == nil || len(*extractM) == 0 {
  438. log.Info(fmt.Sprintf("taskinfo extract id=%s 未查询到数据", id))
  439. return
  440. }
  441. }
  442. update := map[string]interface{}{} //要更新的mongo数据
  443. //更新bidding表字段
  444. for _, k := range config.Conf.Serve.FieldS {
  445. v1 := (*extractM)[k] //extract
  446. v2 := (*tmp)[k] //bidding
  447. if v2 == nil && v1 != nil {
  448. update[k] = v1
  449. } else if v2 != nil && v1 != nil {
  450. update[k] = v1
  451. } else if v2 != nil && v1 == nil {
  452. if k == "city" || k == "district" {
  453. update[k] = ""
  454. }
  455. }
  456. }
  457. if util.IntAll((*extractM)["repeat"]) == 1 {
  458. update["extracttype"] = -1
  459. update["dataprocess"] = 7
  460. } else {
  461. update["extracttype"] = 1
  462. update["dataprocess"] = 8
  463. }
  464. //处理分类
  465. fieldFun(*extractM, update)
  466. extractMap := make(map[string]interface{})
  467. if util.ObjToString((*tmp)["s_winner"]) != "" {
  468. cid := companyFun(*tmp)
  469. if len(cid) > 0 {
  470. update["entidlist"] = cid
  471. extractMap["entidlist"] = cid
  472. }
  473. MgoE.UpdateById(config.Conf.DB.MongoE.Coll, id, map[string]interface{}{"$set": extractMap})
  474. //updateExtPool <- []map[string]interface{}{
  475. // {"_id": mongodb.StringTOBsonId(id)},
  476. // {"$set": extractMap},
  477. //}
  478. }
  479. // 附件有效字段
  480. if i := validFile(*tmp); i != 0 {
  481. if i == -1 {
  482. update["isValidFile"] = false
  483. } else {
  484. update["isValidFile"] = true
  485. }
  486. }
  487. util.Debug(update)
  488. if len(update) > 0 {
  489. MgoB.UpdateById(config.Conf.DB.MongoB.Coll, id, map[string]interface{}{"$set": update})
  490. //updateBidPool <- []map[string]interface{}{{
  491. // "_id": mongodb.StringTOBsonId(id),
  492. //},
  493. // {"$set": update},
  494. //}
  495. }
  496. mapinfo := map[string]interface{}{
  497. "infoid": id,
  498. "stype": "index-by-id",
  499. }
  500. datas, _ := json.Marshal(mapinfo)
  501. var next = &net.UDPAddr{
  502. IP: net.ParseIP(config.Conf.Udp.Next.Addr),
  503. Port: util.IntAll(config.Conf.Udp.Next.Port),
  504. }
  505. log.Info("nsq data over", zap.Any("es", next), zap.String("mapinfo", string(datas)))
  506. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  507. }