biddingdata.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package main
  2. import (
  3. "esindex/config"
  4. "go.mongodb.org/mongo-driver/bson"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "time"
  10. )
  11. func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
  12. defer util.Catch()
  13. stype := util.ObjToString(mapInfo["stype"])
  14. q, _ := mapInfo["query"].(map[string]interface{})
  15. if q == nil {
  16. q = map[string]interface{}{
  17. "_id": bson.M{
  18. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  19. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  20. },
  21. }
  22. } else {
  23. idMap := q["_id"].(map[string]interface{})
  24. tmpQ := map[string]interface{}{}
  25. for c, id := range idMap {
  26. if idStr, ok := id.(string); ok && id != "" {
  27. tmpQ[c] = mongodb.StringTOBsonId(idStr)
  28. }
  29. }
  30. q["_id"] = tmpQ
  31. }
  32. //bidding库
  33. biddingConn := MgoB.GetMgoConn()
  34. defer MgoB.DestoryMongoConn(biddingConn)
  35. count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count()
  36. log.Info("biddingDataTask", zap.Any("查询语句", q), zap.Any("同步总数:", count))
  37. //查询招标数据
  38. query := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(bson.M{
  39. "projectname": 1,
  40. "title": 1,
  41. "site": 1,
  42. "href": 1,
  43. "publishtime": 1,
  44. "spidercode": 1,
  45. "extracttype": 1,
  46. "comeintime": 1,
  47. "buyer": 1, //采购单位
  48. "s_winner": 1, //中标单位
  49. "projectcode": 1, //项目标号
  50. "agency": 1, //代理机构
  51. "budget": 1, //预算
  52. "area": 1, //省份
  53. "city": 1, //城市
  54. "district": 1, //区县
  55. "bidamount": 1, //中标金额
  56. "toptype": 1, //一级分类
  57. "subtype": 1, //二级分类
  58. }).Sort("_id").Iter()
  59. n := 0
  60. //更新数组
  61. var mpool = make(chan bool, 30)
  62. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  63. if n%20000 == 0 {
  64. log.Info("current", zap.Int("count", n), zap.Any("current id", tmp["_id"]))
  65. }
  66. mpool <- true
  67. go func(tmp map[string]interface{}) {
  68. defer func() {
  69. <-mpool
  70. }()
  71. newTmp, _ := GetEsField(tmp, stype)
  72. newTmp["extracttype"] = util.IntAll(tmp["extracttype"])
  73. newTmp["pici"] = time.Now().Unix()
  74. saveEsAllPool <- newTmp
  75. }(tmp)
  76. tmp = make(map[string]interface{})
  77. }
  78. log.Info("create biddingdata index over", zap.Any("mapInfo", mapInfo), zap.Int("count", n))
  79. }