buyertask.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "esindex/config"
  7. "go.mongodb.org/mongo-driver/bson/primitive"
  8. "go.uber.org/zap"
  9. "sync"
  10. "time"
  11. )
  12. var fieldArr = []string{"institute_type", "fixedphone", "mobilephone", "latestfixedphone", "latestmobilephone", "province", "city"}
  13. func buyerEsTaskOnce() {
  14. defer util.Catch()
  15. arrEs := []map[string]interface{}{}
  16. buyerEsLock := &sync.Mutex{}
  17. pool := make(chan bool, 3)
  18. wg := &sync.WaitGroup{}
  19. now := time.Now()
  20. preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
  21. curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
  22. task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime))
  23. task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime))
  24. log.Info("buyer 区间id", zap.String("sid", task_sid), zap.String("eid", task_eid))
  25. //区间id
  26. q := map[string]interface{}{
  27. "_id": map[string]interface{}{
  28. "$gte": mongodb.StringTOBsonId(task_sid),
  29. "$lt": mongodb.StringTOBsonId(task_eid),
  30. },
  31. }
  32. //mongo
  33. sess := MgoQ.GetMgoConn()
  34. defer MgoQ.DestoryMongoConn(sess)
  35. it_1 := sess.DB(MgoQ.DbName).C("buyer_enterprise").Find(&q).Select(map[string]interface{}{
  36. "buyer_name": 1,
  37. "institute_type": 1,
  38. "buyerclass": 1,
  39. "fixedphone": 1,
  40. "mobilephone": 1,
  41. "latestfixedphone": 1,
  42. "latestmobilephone": 1,
  43. "province": 1,
  44. "city": 1,
  45. }).Sort("_id").Iter()
  46. num_1 := 0
  47. for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
  48. if num_1%2000 == 0 && num_1 > 0 {
  49. log.Info("current", zap.Int("数量", num_1))
  50. }
  51. pool <- true
  52. wg.Add(1)
  53. go func(tmp map[string]interface{}) {
  54. defer func() {
  55. <-pool
  56. wg.Done()
  57. }()
  58. savetmp := map[string]interface{}{}
  59. _id := mongodb.BsonIdToSId(tmp["_id"])
  60. if util.ObjToString(tmp["buyerclass"]) != "" {
  61. savetmp["buyerclass"] = tmp["buyerclass"]
  62. }
  63. savetmp["_id"] = _id
  64. savetmp["name"] = tmp["buyer_name"]
  65. savetmp["buyer_name"] = tmp["buyer_name"]
  66. for _, f := range fieldArr {
  67. if val := util.ObjToString(tmp[f]); val != "" {
  68. savetmp[f] = val
  69. }
  70. }
  71. buyerEsLock.Lock()
  72. arrEs = append(arrEs, savetmp)
  73. if len(arrEs) >= EsBulkSize {
  74. tmps := arrEs
  75. Es.BulkSave(config.Conf.DB.Es.IndexBuyer, tmps)
  76. arrEs = []map[string]interface{}{}
  77. }
  78. buyerEsLock.Unlock()
  79. }(tmp)
  80. tmp = make(map[string]interface{})
  81. }
  82. wg.Wait()
  83. buyerEsLock.Lock()
  84. if len(arrEs) > 0 {
  85. tmps := arrEs
  86. Es.BulkSave(config.Conf.DB.Es.IndexBuyer, tmps)
  87. arrEs = []map[string]interface{}{}
  88. }
  89. buyerEsLock.Unlock()
  90. log.Info("buyer over!", zap.Int("总计", num_1))
  91. }