winnertask.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "esindex/config"
  7. "go.mongodb.org/mongo-driver/bson/primitive"
  8. "go.uber.org/zap"
  9. "sync"
  10. "time"
  11. )
  12. func winnerEsTaskOnce() {
  13. defer util.Catch()
  14. arrEs := []map[string]interface{}{}
  15. winerEsLock := &sync.Mutex{}
  16. pool := make(chan bool, 3)
  17. wg := &sync.WaitGroup{}
  18. now := time.Now()
  19. preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
  20. curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
  21. task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime))
  22. task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime))
  23. log.Info("winner 区间id", zap.String("sid", task_sid), zap.String("eid", task_eid))
  24. //区间id
  25. q := map[string]interface{}{
  26. "_id": map[string]interface{}{
  27. "$lt": mongodb.StringTOBsonId(task_eid),
  28. },
  29. }
  30. //mongo
  31. sess := MgoQ.GetMgoConn()
  32. defer MgoQ.DestoryMongoConn(sess)
  33. it_1 := sess.DB(MgoQ.DbName).C("winner_enterprise").Find(&q).Sort("_id").Iter()
  34. num_1 := 0
  35. for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
  36. if num_1%2000 == 0 && num_1 > 0 {
  37. log.Info("current", zap.Int("count", num_1))
  38. }
  39. pool <- true
  40. wg.Add(1)
  41. go func(tmp map[string]interface{}) {
  42. defer func() {
  43. <-pool
  44. wg.Done()
  45. }()
  46. savetmp := map[string]interface{}{}
  47. tmp_id := mongodb.BsonIdToSId(tmp["_id"])
  48. savetmp["_id"] = tmp_id
  49. savetmp["name"] = tmp["company_name"]
  50. savetmp["winner_name"] = tmp["company_name"]
  51. savetmp["pici"] = tmp["updatetime"]
  52. if province := util.ObjToString(tmp["province"]); province != "" {
  53. savetmp["province"] = province
  54. }
  55. if city := util.ObjToString(tmp["city"]); city != "" {
  56. savetmp["city"] = city
  57. }
  58. if text := util.ObjToString(tmp["tag_business"]); text != "" {
  59. savetmp["tag_business"] = text
  60. }
  61. winerEsLock.Lock()
  62. arrEs = append(arrEs, savetmp)
  63. if len(arrEs) >= EsBulkSize {
  64. tmps := arrEs
  65. Es.BulkSave(config.Conf.DB.Es.IndexWinner, config.Conf.DB.Es.TypeWinner, &tmps, true)
  66. arrEs = []map[string]interface{}{}
  67. }
  68. winerEsLock.Unlock()
  69. }(tmp)
  70. tmp = make(map[string]interface{})
  71. }
  72. wg.Wait()
  73. winerEsLock.Lock()
  74. if len(arrEs) > 0 {
  75. tmps := arrEs
  76. Es.BulkSave(config.Conf.DB.Es.IndexWinner, config.Conf.DB.Es.TypeWinner, &tmps, true)
  77. arrEs = []map[string]interface{}{}
  78. }
  79. winerEsLock.Unlock()
  80. log.Info("winner over!", zap.Int("总计", num_1))
  81. }