winnertask.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson/primitive"
  4. "log"
  5. "sync"
  6. "time"
  7. util "utils"
  8. "utils/mongodb"
  9. )
  10. func winnerEsTaskOnce() {
  11. defer util.Catch()
  12. arrEs := []map[string]interface{}{}
  13. winerEsLock := &sync.Mutex{}
  14. pool := make(chan bool, 3)
  15. wg := &sync.WaitGroup{}
  16. now := time.Now()
  17. preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
  18. curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
  19. task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime))
  20. task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime))
  21. //task_sid = "5e6598f82c27dc56292158da"
  22. //task_eid = "620576342566c40049f26155"
  23. log.Println("winner 区间id:", task_sid, task_eid)
  24. //区间id
  25. q := map[string]interface{}{
  26. "_id": map[string]interface{}{
  27. //"$gte": mongodb.StringTOBsonId(task_sid),
  28. "$lt": mongodb.StringTOBsonId(task_eid),
  29. },
  30. }
  31. //参数
  32. winnerent, _ := standard["winnerent"].(map[string]interface{})
  33. win_ent := util.ObjToString(winnerent["collect1"])
  34. //win_enterr := qu.ObjToString(winnerent["collect2"])
  35. index, _ := winnerent["index"].(string)
  36. itype, _ := winnerent["type"].(string)
  37. //mongo
  38. sess := standardMgo.GetMgoConn()
  39. defer standardMgo.DestoryMongoConn(sess)
  40. log.Println("q:", q, "db:", standardMgo.DbName, "coll:", win_ent)
  41. it_1 := sess.DB(standardMgo.DbName).C(win_ent).Find(&q).Sort("_id").Iter()
  42. num_1 := 0
  43. for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
  44. if num_1%2000 == 0 && num_1 > 0 {
  45. log.Println("当前表:", win_ent, "数量:", num_1)
  46. }
  47. pool <- true
  48. wg.Add(1)
  49. go func(tmp map[string]interface{}) {
  50. defer func() {
  51. <-pool
  52. wg.Done()
  53. }()
  54. savetmp := map[string]interface{}{}
  55. tmp_id := mongodb.BsonIdToSId(tmp["_id"])
  56. savetmp["_id"] = tmp_id
  57. savetmp["name"] = tmp["company_name"]
  58. savetmp["winner_name"] = tmp["company_name"]
  59. savetmp["pici"] = tmp["updatetime"]
  60. if province := util.ObjToString(tmp["province"]); province != "" {
  61. savetmp["province"] = province
  62. }
  63. if city := util.ObjToString(tmp["city"]); city != "" {
  64. savetmp["city"] = city
  65. }
  66. if text := util.ObjToString(tmp["tag_business"]); text != "" {
  67. savetmp["tag_business"] = text
  68. }
  69. winerEsLock.Lock()
  70. arrEs = append(arrEs, savetmp)
  71. if len(arrEs) >= EsBulkSize {
  72. tmps := arrEs
  73. //Es1.BulkSave(index, itype, &tmps, true)
  74. Es2.BulkSave(index, itype, &tmps, true)
  75. arrEs = []map[string]interface{}{}
  76. }
  77. winerEsLock.Unlock()
  78. }(tmp)
  79. tmp = make(map[string]interface{})
  80. }
  81. // log.Println("q:", q, "db:", mgostandard.DbName, "coll:", win_enterr)
  82. // it_2 := sess.DB(mgostandard.DbName).C(win_enterr).Find(&q).Sort("_id").Iter()
  83. // num_2 := 0
  84. // for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ {
  85. // if num_2%100 == 0 && num_2 > 0 {
  86. // log.Println("当前表:", win_enterr, "数量:", num_2)
  87. // }
  88. // pool <- true
  89. // wg.Add(1)
  90. // go func(tmp map[string]interface{}) {
  91. // defer func() {
  92. // <-pool
  93. // wg.Done()
  94. // }()
  95. // savetmp := map[string]interface{}{}
  96. // tmp_id := mongodb.BsonIdToSId(tmp["_id"])
  97. // savetmp["_id"] = tmp_id
  98. // savetmp["name"] = tmp["name"]
  99. // savetmp["winner_name"] = tmp["name"]
  100. // savetmp["pici"] = tmp["updatetime"]
  101. // winerEsLock.Lock()
  102. // arrEs = append(arrEs, savetmp)
  103. // if len(arrEs) >= BulkSize {
  104. // tmps := arrEs
  105. // elastic.BulkSave(index, itype, &tmps, true)
  106. // arrEs = []map[string]interface{}{}
  107. // }
  108. // winerEsLock.Unlock()
  109. // }(tmp)
  110. // tmp = make(map[string]interface{})
  111. // }
  112. wg.Wait()
  113. winerEsLock.Lock()
  114. if len(arrEs) > 0 {
  115. tmps := arrEs
  116. //Es1.BulkSave(index, itype, &tmps, true)
  117. Es2.BulkSave(index, itype, &tmps, true)
  118. arrEs = []map[string]interface{}{}
  119. }
  120. winerEsLock.Unlock()
  121. log.Println("winnerextract 索引完毕! 总计:", num_1)
  122. }