task.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "app.yhyue.com/data_processing/common_utils/udp"
  8. "encoding/json"
  9. "field_sync/config"
  10. "field_sync/oss"
  11. "fmt"
  12. "go.mongodb.org/mongo-driver/bson"
  13. "go.uber.org/zap"
  14. "net"
  15. "reflect"
  16. "regexp"
  17. "strings"
  18. "time"
  19. )
  20. var (
  21. regLetter = regexp.MustCompile("[a-z]*")
  22. )
  23. func biddingTask(data []byte, mapInfo map[string]interface{}) {
  24. defer util.Catch()
  25. stype := util.ObjToString(mapInfo["stype"])
  26. if stype == "bidding" {
  27. uq := bson.M{"gtid": bson.M{"$gte": util.ObjToString(mapInfo["gtid"])},
  28. "lteid": bson.M{"$lte": util.ObjToString(mapInfo["lteid"])}}
  29. MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 7, "updatetime": time.Now().Unix()}}, false, true)
  30. }
  31. // 领域标签处理的数据 id段
  32. if stype == "bidding_history" {
  33. MgoB.Save("field_data_record", map[string]interface{}{"gtid": mapInfo["gtid"], "lteid": mapInfo["lteid"], "status": 0})
  34. }
  35. q, _ := mapInfo["query"].(map[string]interface{})
  36. bkey, _ := mapInfo["bkey"].(string)
  37. if q == nil {
  38. q = map[string]interface{}{
  39. "_id": map[string]interface{}{
  40. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  41. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  42. },
  43. }
  44. }
  45. //extract库
  46. extractConn := MgoE.GetMgoConn()
  47. defer MgoE.DestoryMongoConn(extractConn)
  48. extractResult := extractConn.DB(MgoE.DbName).C(config.Conf.DB.MongoE.Coll).Find(q).Select(map[string]interface{}{
  49. "field_source": 0,
  50. "kvtext": 0,
  51. }).Sort("_id").Iter()
  52. eMap := map[string]map[string]interface{}{}
  53. extCount, repeatCount := 0, 0
  54. for tmp := make(map[string]interface{}); extractResult.Next(tmp); extCount++ {
  55. if util.IntAll(tmp["repeat"]) == 1 {
  56. repeatCount++
  57. }
  58. tid := mongodb.BsonIdToSId(tmp["_id"])
  59. eMap[tid] = tmp
  60. tmp = make(map[string]interface{})
  61. }
  62. log.Info("抽取表", zap.Int("数据量", extCount), zap.Int("重复数据量", repeatCount))
  63. //bidding库
  64. biddingConn := MgoB.GetMgoConn()
  65. count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count()
  66. log.Info("bidding表", zap.Int64("同步总数:", count))
  67. c := 0
  68. if count < 500000 {
  69. var res []map[string]interface{}
  70. result := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(map[string]interface{}{
  71. "contenthtml": 0,
  72. }).Iter()
  73. for tmp := make(map[string]interface{}); result.Next(tmp); {
  74. res = append(res, tmp)
  75. tmp = make(map[string]interface{})
  76. }
  77. MgoB.DestoryMongoConn(biddingConn)
  78. log.Info("查询结果", zap.Int64("bidding", count), zap.Int("抽取:", extCount))
  79. c = doIndex(res, eMap, bkey, stype)
  80. } else {
  81. log.Info("查询结果", zap.Int64("数据量太大,放弃", count))
  82. MgoB.DestoryMongoConn(biddingConn)
  83. }
  84. log.Info("bidding sync...over", zap.Int64("all", count), zap.Int("extract sync", c))
  85. NextNode(mapInfo, stype)
  86. NextNodePro(mapInfo, stype)
  87. NextNodeTidb(mapInfo, stype)
  88. if stype == "bidding_history" {
  89. NextNodeBidData(mapInfo) // bidding-data数据
  90. NextNodeTidbQyxy(mapInfo) // tidb-企业数据
  91. NextNodeHn(mapInfo)
  92. }
  93. }
  94. func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, bkey, stype string) int {
  95. syncNo := 0 //抽取表数据同步数量
  96. //对比两张表数据,减少查询次数
  97. var compare map[string]interface{}
  98. var bidUpdate [][]map[string]interface{}
  99. var extUpdate [][]map[string]interface{}
  100. //SaveEsLock := &sync.Mutex{}
  101. log.Info("start ...")
  102. for n, tmp := range infos {
  103. tid := mongodb.BsonIdToSId(tmp["_id"])
  104. update := map[string]interface{}{} //要更新的mongo数据
  105. //对比方法----------------
  106. if eMap[tid] != nil {
  107. compare = eMap[tid]
  108. if stype == "bidding" {
  109. // 增量id段 正常数据
  110. if dg := util.IntAll(compare["dataging"]); dg == 1 { //extract中dataging=1跳过
  111. tmp = make(map[string]interface{})
  112. compare = nil
  113. continue
  114. }
  115. delete(eMap, tid)
  116. }
  117. if stype == "bidding_history" {
  118. //增量id段 历史数据
  119. if compare["history_updatetime"] == nil { //extract中history_updatetime不存在跳过
  120. tmp = make(map[string]interface{})
  121. compare = nil
  122. continue
  123. }
  124. delete(eMap, tid)
  125. }
  126. syncNo++
  127. for _, k := range config.Conf.Serve.FieldS {
  128. v1 := compare[k] //extract
  129. v2 := tmp[k] //bidding
  130. if v2 == nil && v1 != nil {
  131. update[k] = v1
  132. } else if v2 != nil && v1 != nil {
  133. update[k] = v1
  134. } else if v2 != nil && v1 == nil {
  135. if k == "city" || k == "district" {
  136. update[k] = ""
  137. }
  138. }
  139. }
  140. if util.IntAll(compare["repeat"]) == 1 {
  141. update["extracttype"] = -1
  142. update["dataprocess"] = 7
  143. } else {
  144. update["extracttype"] = 1
  145. update["dataprocess"] = 8
  146. }
  147. } else {
  148. compare = nil
  149. if util.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging
  150. update["dataging"] = 0
  151. }
  152. update["dataprocess"] = 8
  153. }
  154. //下面可以多线程跑的--->
  155. //处理分类
  156. if compare != nil { //extract
  157. fieldFun(compare, update)
  158. compare = nil
  159. }
  160. //------------------对比结束
  161. //处理key descript
  162. if bkey == "" {
  163. DealInfo(&tmp, &update)
  164. }
  165. // entidlist
  166. extractMap := make(map[string]interface{})
  167. if update["s_winner"] != "" {
  168. cid := companyFun(update)
  169. if len(cid) > 0 {
  170. tmp["entidlist"] = cid
  171. update["entidlist"] = cid
  172. extractMap["entidlist"] = cid
  173. }
  174. }
  175. // 6.10 剑鱼发布信息分类处理, 写在这里是为了修改抽取表
  176. typeFunc(tmp, update, extractMap)
  177. if len(extractMap) > 0 {
  178. if extractMap["toptype"] != nil && extractMap["subtype"] == nil {
  179. //updateExtPool <- []map[string]interface{}{
  180. // {"_id": tmp["_id"]},
  181. // {"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}},
  182. //}
  183. extUpdate = append(extUpdate, []map[string]interface{}{
  184. {"_id": tmp["_id"]},
  185. {"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}},
  186. })
  187. } else {
  188. //updateExtPool <- []map[string]interface{}{
  189. // {"_id": tmp["_id"]},
  190. // {"$set": extractMap},
  191. //}
  192. extUpdate = append(extUpdate, []map[string]interface{}{
  193. {"_id": tmp["_id"]},
  194. {"$set": extractMap},
  195. })
  196. }
  197. if len(extUpdate) >= MgoBulkSize {
  198. tmps := extUpdate
  199. MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...)
  200. extUpdate = [][]map[string]interface{}{}
  201. }
  202. }
  203. // 附件有效字段
  204. if i := validFile(tmp); i != 0 {
  205. if i == -1 {
  206. tmp["isValidFile"] = false
  207. update["isValidFile"] = false
  208. } else {
  209. tmp["isValidFile"] = true
  210. update["isValidFile"] = true
  211. }
  212. }
  213. if len(update) > 0 {
  214. //SaveEsLock.Lock()
  215. bidUpdate = append(bidUpdate, []map[string]interface{}{{
  216. "_id": tmp["_id"],
  217. },
  218. {"$set": update},
  219. })
  220. if len(bidUpdate) >= MgoBulkSize {
  221. tmps := bidUpdate
  222. MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...)
  223. bidUpdate = [][]map[string]interface{}{}
  224. }
  225. //SaveEsLock.Unlock()
  226. //updateBidPool <- []map[string]interface{}{{
  227. // "_id": tmp["_id"],
  228. //},
  229. // {"$set": update},
  230. //}
  231. }
  232. if n%500 == 0 {
  233. log.Info("biddingTask", zap.Int("current", n))
  234. }
  235. tmp = make(map[string]interface{})
  236. }
  237. //SaveEsLock.Lock()
  238. if len(bidUpdate) > 0 {
  239. tmps := bidUpdate
  240. MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...)
  241. bidUpdate = [][]map[string]interface{}{}
  242. }
  243. if len(extUpdate) > 0 {
  244. tmps := extUpdate
  245. MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...)
  246. extUpdate = [][]map[string]interface{}{}
  247. }
  248. //SaveEsLock.Unlock()
  249. return syncNo
  250. }
  251. // @Description subscopeclass、topscopeclass、package
  252. // @Author J 2022/6/7 5:54 PM
  253. func fieldFun(compare, update map[string]interface{}) {
  254. subscopeclass, _ := compare["subscopeclass"].([]interface{}) //subscopeclass
  255. if subscopeclass != nil {
  256. m1 := map[string]bool{}
  257. newclass := []string{}
  258. for _, sc := range subscopeclass {
  259. sclass, _ := sc.(string)
  260. if !m1[sclass] {
  261. m1[sclass] = true
  262. newclass = append(newclass, sclass)
  263. }
  264. }
  265. update["s_subscopeclass"] = strings.Join(newclass, ",")
  266. update["subscopeclass"] = newclass
  267. }
  268. topscopeclass, _ := compare["topscopeclass"].([]interface{}) //topscopeclass
  269. if topscopeclass != nil {
  270. m2 := map[string]bool{}
  271. newclass := []string{}
  272. for _, tc := range topscopeclass {
  273. tclass, _ := tc.(string)
  274. tclass = regLetter.ReplaceAllString(tclass, "") // 去除字母
  275. if !m2[tclass] {
  276. m2[tclass] = true
  277. newclass = append(newclass, tclass)
  278. }
  279. }
  280. update["topscopeclass"] = topscopeclass
  281. update["s_topscopeclass"] = strings.Join(newclass, ",")
  282. }
  283. if package1 := compare["package"]; package1 != nil {
  284. packageM, _ := package1.(map[string]interface{})
  285. update["package"] = packageM
  286. for _, p := range packageM {
  287. pm, _ := p.(map[string]interface{})
  288. if util.ObjToString(pm["winner"]) != "" || util.Float64All(pm["budget"]) > 0 ||
  289. util.Float64All(pm["bidamount"]) > 0 {
  290. update["multipackage"] = 1
  291. break
  292. }
  293. }
  294. } else {
  295. update["multipackage"] = 0
  296. }
  297. }
  298. // @Description entidlist
  299. // @Author J 2022/6/7 2:36 PM
  300. func companyFun(tmp map[string]interface{}) (cid []string) {
  301. sWinnerarr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  302. for _, w := range sWinnerarr {
  303. if w != "" {
  304. id := redis.GetStr("qyxy_id", w)
  305. if id == "" {
  306. ents, _ := MgoQ.Find(config.Conf.DB.MongoQ.Coll, map[string]interface{}{"company_name": w}, map[string]interface{}{"updatetime": -1}, map[string]interface{}{"company_name": 1}, false, -1, -1)
  307. if len(*ents) > 0 {
  308. id = util.ObjToString((*ents)[0]["_id"])
  309. redis.PutCKV("qyxy_id", w, id)
  310. } else {
  311. ent, _ := MgoP.FindOne(config.Conf.DB.MongoP.Coll, map[string]interface{}{"history_name": w})
  312. if len(*ent) > 0 {
  313. id = util.ObjToString((*ent)["company_id"])
  314. redis.PutCKV("qyxy_id", w, id)
  315. }
  316. }
  317. }
  318. if id == "" {
  319. id = "-"
  320. }
  321. cid = append(cid, id)
  322. }
  323. }
  324. return cid
  325. }
  326. // @Description update 修改bidding表,extractM修改抽取表
  327. // @Author J 2022/6/10 10:29 AM
  328. func typeFunc(tmp, update, extractM map[string]interface{}) {
  329. if jyData, ok := tmp["jyfb_data"].(map[string]interface{}); ok {
  330. if t := util.ObjToString(jyData["type"]); t != "" {
  331. switch t {
  332. //case "采购信息":
  333. case "招标公告":
  334. if util.ObjToString(tmp["toptype"]) != "招标" {
  335. update["toptype"] = "招标"
  336. extractM["toptype"] = "招标"
  337. delete(update, "subtype")
  338. }
  339. case "采购意向":
  340. if util.ObjToString(tmp["toptype"]) != "采购意向" {
  341. update["toptype"] = "采购意向"
  342. update["subtype"] = "采购意向"
  343. extractM["toptype"] = "采购意向"
  344. extractM["subtype"] = "采购意向"
  345. }
  346. case "招标预告":
  347. if util.ObjToString(tmp["toptype"]) != "预告" {
  348. update["toptype"] = "预告"
  349. extractM["toptype"] = "预告"
  350. delete(update, "subtype")
  351. }
  352. case "招标结果":
  353. if util.ObjToString(tmp["toptype"]) != "结果" {
  354. update["toptype"] = "结果"
  355. extractM["toptype"] = "结果"
  356. delete(update, "subtype")
  357. }
  358. }
  359. }
  360. }
  361. }
  362. // @Description 附件有效字段(isValidFile)
  363. // @Author J 2022/7/8 14:41
  364. func validFile(tmp map[string]interface{}) int {
  365. isContinue := false
  366. if pinfo, o := tmp["projectinfo"].(map[string]interface{}); o {
  367. if atts, o1 := pinfo["attachments"].(map[string]interface{}); o1 {
  368. for _, att := range atts {
  369. if att == nil {
  370. continue
  371. }
  372. if reflect.TypeOf(att).String() == "string" {
  373. continue
  374. }
  375. att1 := att.(map[string]interface{})
  376. if fid := util.ObjToString(att1["fid"]); fid != "" {
  377. isContinue = true
  378. break
  379. }
  380. }
  381. if isContinue {
  382. if attachTxt, o := tmp["attach_text"].(map[string]interface{}); o {
  383. if len(attachTxt) > 0 {
  384. for _, at := range attachTxt {
  385. at1 := at.(map[string]interface{})
  386. if len(at1) > 0 {
  387. for k, _ := range at1 {
  388. if reflect.TypeOf(at1[k]).String() == "string" {
  389. continue
  390. }
  391. at2 := at1[k].(map[string]interface{})
  392. s := strings.ToLower(util.ObjToString(at2["file_name"]))
  393. if !strings.Contains(s, "jpg") || !strings.Contains(s, "jpeg") != strings.Contains(s, "png") ||
  394. strings.Contains(s, "pdf") {
  395. if strings.Contains(s, "swf") || strings.Contains(s, "html") {
  396. return -1
  397. } else if AnalysisFile(oss.OssGetObject(util.ObjToString(at2["attach_url"]))) {
  398. return 1
  399. }
  400. }
  401. }
  402. break
  403. } else {
  404. break
  405. }
  406. }
  407. }
  408. }
  409. flag := false
  410. for _, att := range atts {
  411. if att == nil {
  412. continue
  413. }
  414. if reflect.TypeOf(att).String() == "string" {
  415. continue
  416. }
  417. att1 := att.(map[string]interface{})
  418. if fid := util.ObjToString(att1["fid"]); fid != "" {
  419. ftype := strings.ToLower(util.ObjToString(tmp["ftype"]))
  420. if ftype != "swf" && ftype != "html" && oss.OssObjExists("jy-datafile", fid) {
  421. return 1
  422. } else {
  423. flag = true
  424. }
  425. }
  426. }
  427. if flag {
  428. return -1
  429. }
  430. }
  431. }
  432. }
  433. return 0
  434. }
  435. // @Description id不变,内容变化 重新索引数据
  436. // @Author J 2022/8/10 13:29
  437. func taskinfo(id string) {
  438. tmp, _ := MgoB.FindById("bidding", id, nil)
  439. if tmp == nil || len(*tmp) == 0 {
  440. log.Info(fmt.Sprintf("taskinfo bidding id=%s 未查询到数据", id))
  441. return
  442. }
  443. extractM, _ := MgoE.FindById(config.Conf.DB.MongoE.Coll, id, nil)
  444. if extractM == nil || len(*extractM) == 0 {
  445. extractM, _ = MgoE.FindById(config.Conf.DB.MongoE.Coll1, id, nil)
  446. if extractM == nil || len(*extractM) == 0 {
  447. log.Info(fmt.Sprintf("taskinfo extract id=%s 未查询到数据", id))
  448. return
  449. }
  450. }
  451. update := map[string]interface{}{} //要更新的mongo数据
  452. //更新bidding表字段
  453. for _, k := range config.Conf.Serve.FieldS {
  454. v1 := (*extractM)[k] //extract
  455. v2 := (*tmp)[k] //bidding
  456. if v2 == nil && v1 != nil {
  457. update[k] = v1
  458. } else if v2 != nil && v1 != nil {
  459. update[k] = v1
  460. } else if v2 != nil && v1 == nil {
  461. if k == "city" || k == "district" {
  462. update[k] = ""
  463. }
  464. }
  465. }
  466. if util.IntAll((*extractM)["repeat"]) == 1 {
  467. update["extracttype"] = -1
  468. update["dataprocess"] = 7
  469. } else {
  470. update["extracttype"] = 1
  471. update["dataprocess"] = 8
  472. }
  473. //处理分类
  474. fieldFun(*extractM, update)
  475. extractMap := make(map[string]interface{})
  476. if util.ObjToString((*tmp)["s_winner"]) != "" {
  477. cid := companyFun(*tmp)
  478. if len(cid) > 0 {
  479. update["entidlist"] = cid
  480. extractMap["entidlist"] = cid
  481. }
  482. MgoE.UpdateById(config.Conf.DB.MongoE.Coll, id, map[string]interface{}{"$set": extractMap})
  483. }
  484. // 附件有效字段
  485. if i := validFile(*tmp); i != 0 {
  486. if i == -1 {
  487. update["isValidFile"] = false
  488. } else {
  489. update["isValidFile"] = true
  490. }
  491. }
  492. if len(update) > 0 {
  493. MgoB.UpdateById(config.Conf.DB.MongoB.Coll, id, map[string]interface{}{"$set": update})
  494. }
  495. mapinfo := map[string]interface{}{
  496. "infoid": id,
  497. "stype": "index-by-id",
  498. }
  499. datas, _ := json.Marshal(mapinfo)
  500. var next = &net.UDPAddr{
  501. IP: net.ParseIP(config.Conf.Udp.Next.Addr),
  502. Port: util.IntAll(config.Conf.Udp.Next.Port),
  503. }
  504. log.Info("nsq data over", zap.Any("es", next), zap.String("mapinfo", string(datas)))
  505. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  506. }