biddingdata.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package main
  2. import (
  3. "esindex/config"
  4. "go.mongodb.org/mongo-driver/bson"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. )
  10. func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
  11. defer util.Catch()
  12. stype := util.ObjToString(mapInfo["stype"])
  13. q, _ := mapInfo["query"].(map[string]interface{})
  14. if q == nil {
  15. q = map[string]interface{}{
  16. "_id": bson.M{
  17. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  18. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  19. },
  20. }
  21. } else {
  22. idMap := q["_id"].(map[string]interface{})
  23. tmpQ := map[string]interface{}{}
  24. for c, id := range idMap {
  25. if idStr, ok := id.(string); ok && id != "" {
  26. tmpQ[c] = mongodb.StringTOBsonId(idStr)
  27. }
  28. }
  29. q["_id"] = tmpQ
  30. }
  31. //bidding库
  32. biddingConn := MgoB.GetMgoConn()
  33. defer MgoB.DestoryMongoConn(biddingConn)
  34. count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count()
  35. log.Info("biddingDataTask", zap.Any("查询语句", q), zap.Any("同步总数:", count))
  36. //查询招标数据
  37. query := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(bson.M{
  38. "projectname": 1,
  39. "title": 1,
  40. "site": 1,
  41. "href": 1,
  42. "publishtime": 1,
  43. "spidercode": 1,
  44. "extracttype": 1,
  45. "comeintime": 1,
  46. }).Sort("_id").Iter()
  47. n := 0
  48. //更新数组
  49. var mpool = make(chan bool, 30)
  50. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  51. if n%20000 == 0 {
  52. log.Info("current", zap.Int("count", n), zap.Any("current id", tmp["_id"]))
  53. }
  54. mpool <- true
  55. go func(tmp map[string]interface{}) {
  56. defer func() {
  57. <-mpool
  58. }()
  59. newTmp, _ := GetEsField(tmp, stype)
  60. newTmp["extracttype"] = util.IntAll(tmp["extracttype"])
  61. saveEsAllPool <- newTmp
  62. }(tmp)
  63. tmp = make(map[string]interface{})
  64. }
  65. log.Info("create biddingdata index over", zap.Any("mapInfo", mapInfo), zap.Int("count", n))
  66. }