winnertask.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package main
  2. import (
  3. "log"
  4. "mongodb"
  5. qu "qfw/util"
  6. elastic "qfw/util/elastic"
  7. "sync"
  8. "time"
  9. "gopkg.in/mgo.v2/bson"
  10. )
  11. func winnerEsTaskOnce() {
  12. defer qu.Catch()
  13. arrEs := []map[string]interface{}{}
  14. winerEsLock := &sync.Mutex{}
  15. pool := make(chan bool, 3)
  16. wg := &sync.WaitGroup{}
  17. now := time.Now()
  18. preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
  19. curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
  20. task_sid := mongodb.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
  21. task_eid := mongodb.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
  22. // task_sid = "5e6598f82c27dc56292158da"
  23. // task_eid = "5f80c8f89a0c261af872294c"
  24. log.Println("winner 区间id:", task_sid, task_eid)
  25. //区间id
  26. q := map[string]interface{}{
  27. "_id": map[string]interface{}{
  28. "$gte": mongodb.StringTOBsonId(task_sid),
  29. "$lt": mongodb.StringTOBsonId(task_eid),
  30. },
  31. }
  32. //参数
  33. winnerent, _ := standard["winnerent"].(map[string]interface{})
  34. win_ent := qu.ObjToString(winnerent["collect1"])
  35. //win_enterr := qu.ObjToString(winnerent["collect2"])
  36. index, _ := winnerent["index"].(string)
  37. itype, _ := winnerent["type"].(string)
  38. //mongo
  39. sess := mgostandard.GetMgoConn()
  40. defer mgostandard.DestoryMongoConn(sess)
  41. log.Println("q:", q, "db:", mgostandard.DbName, "coll:", win_ent)
  42. it_1 := sess.DB(mgostandard.DbName).C(win_ent).Find(&q).Sort("_id").Iter()
  43. num_1 := 0
  44. for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
  45. if num_1%100 == 0 && num_1 > 0 {
  46. log.Println("当前表:", win_ent, "数量:", num_1)
  47. }
  48. pool <- true
  49. wg.Add(1)
  50. go func(tmp map[string]interface{}) {
  51. defer func() {
  52. <-pool
  53. wg.Done()
  54. }()
  55. savetmp := map[string]interface{}{}
  56. tmp_id := mongodb.BsonIdToSId(tmp["_id"])
  57. savetmp["_id"] = tmp_id
  58. savetmp["name"] = tmp["company_name"]
  59. savetmp["winner_name"] = tmp["company_name"]
  60. savetmp["pici"] = tmp["updatetime"]
  61. if province := qu.ObjToString(tmp["province"]); province != "" {
  62. savetmp["province"] = province
  63. }
  64. if city := qu.ObjToString(tmp["city"]); city != "" {
  65. savetmp["city"] = city
  66. }
  67. winerEsLock.Lock()
  68. arrEs = append(arrEs, savetmp)
  69. if len(arrEs) >= BulkSize {
  70. tmps := arrEs
  71. elastic.BulkSave(index, itype, &tmps, true)
  72. arrEs = []map[string]interface{}{}
  73. }
  74. winerEsLock.Unlock()
  75. }(tmp)
  76. tmp = make(map[string]interface{})
  77. }
  78. // log.Println("q:", q, "db:", mgostandard.DbName, "coll:", win_enterr)
  79. // it_2 := sess.DB(mgostandard.DbName).C(win_enterr).Find(&q).Sort("_id").Iter()
  80. // num_2 := 0
  81. // for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ {
  82. // if num_2%100 == 0 && num_2 > 0 {
  83. // log.Println("当前表:", win_enterr, "数量:", num_2)
  84. // }
  85. // pool <- true
  86. // wg.Add(1)
  87. // go func(tmp map[string]interface{}) {
  88. // defer func() {
  89. // <-pool
  90. // wg.Done()
  91. // }()
  92. // savetmp := map[string]interface{}{}
  93. // tmp_id := mongodb.BsonIdToSId(tmp["_id"])
  94. // savetmp["_id"] = tmp_id
  95. // savetmp["name"] = tmp["name"]
  96. // savetmp["winner_name"] = tmp["name"]
  97. // savetmp["pici"] = tmp["updatetime"]
  98. // winerEsLock.Lock()
  99. // arrEs = append(arrEs, savetmp)
  100. // if len(arrEs) >= BulkSize {
  101. // tmps := arrEs
  102. // elastic.BulkSave(index, itype, &tmps, true)
  103. // arrEs = []map[string]interface{}{}
  104. // }
  105. // winerEsLock.Unlock()
  106. // }(tmp)
  107. // tmp = make(map[string]interface{})
  108. // }
  109. wg.Wait()
  110. winerEsLock.Lock()
  111. if len(arrEs) > 0 {
  112. tmps := arrEs
  113. elastic.BulkSave(index, itype, &tmps, true)
  114. arrEs = []map[string]interface{}{}
  115. }
  116. winerEsLock.Unlock()
  117. log.Println("winnerextract 索引完毕! 总计:", num_1)
  118. }