winnertask.go 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package main
  2. import (
  3. "esindex/config"
  4. "go.mongodb.org/mongo-driver/bson/primitive"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "sync"
  10. "time"
  11. )
  12. func winnerEsTaskOnce() {
  13. defer util.Catch()
  14. arrEs := []map[string]interface{}{}
  15. winerEsLock := &sync.Mutex{}
  16. pool := make(chan bool, 3)
  17. wg := &sync.WaitGroup{}
  18. now := time.Now()
  19. preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
  20. curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
  21. task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime))
  22. task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime))
  23. log.Info("winner 区间id", zap.String("sid", task_sid), zap.String("eid", task_eid))
  24. //区间id
  25. q := map[string]interface{}{
  26. "_id": map[string]interface{}{
  27. "$gte": mongodb.StringTOBsonId(task_sid),
  28. "$lt": mongodb.StringTOBsonId(task_eid),
  29. },
  30. }
  31. //mongo
  32. sess := MgoQ.GetMgoConn()
  33. defer MgoQ.DestoryMongoConn(sess)
  34. it_1 := sess.DB(MgoQ.DbName).C("winner_enterprise").Find(&q).Sort("_id").Iter()
  35. num_1 := 0
  36. for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
  37. if num_1%2000 == 0 && num_1 > 0 {
  38. log.Info("winnerEsTaskOnce current", zap.Int("count", num_1))
  39. }
  40. pool <- true
  41. wg.Add(1)
  42. go func(tmp map[string]interface{}) {
  43. defer func() {
  44. <-pool
  45. wg.Done()
  46. }()
  47. savetmp := map[string]interface{}{}
  48. tmp_id := mongodb.BsonIdToSId(tmp["_id"])
  49. savetmp["_id"] = tmp_id
  50. savetmp["name"] = tmp["company_name"]
  51. savetmp["winner_name"] = tmp["company_name"]
  52. savetmp["pici"] = tmp["updatetime"]
  53. if province := util.ObjToString(tmp["province"]); province != "" {
  54. savetmp["province"] = province
  55. }
  56. if city := util.ObjToString(tmp["city"]); city != "" {
  57. savetmp["city"] = city
  58. }
  59. if text := util.ObjToString(tmp["tag_business"]); text != "" {
  60. savetmp["tag_business"] = text
  61. }
  62. winerEsLock.Lock()
  63. arrEs = append(arrEs, savetmp)
  64. if len(arrEs) >= EsBulkSize {
  65. tmps := arrEs
  66. Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps)
  67. arrEs = []map[string]interface{}{}
  68. }
  69. winerEsLock.Unlock()
  70. }(tmp)
  71. tmp = make(map[string]interface{})
  72. }
  73. wg.Wait()
  74. winerEsLock.Lock()
  75. if len(arrEs) > 0 {
  76. tmps := arrEs
  77. Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps)
  78. arrEs = []map[string]interface{}{}
  79. }
  80. winerEsLock.Unlock()
  81. log.Info("winner over!", zap.Int("总计", num_1))
  82. }