winnerextract.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package main
  2. import (
  3. "gopkg.in/mgo.v2/bson"
  4. "log"
  5. qu "qfw/util"
  6. "sync"
  7. "time"
  8. )
  9. func winnerEsTaskOnce() {
  10. defer qu.Catch()
  11. now := time.Now()
  12. preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
  13. curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
  14. task_sid := qu.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
  15. task_eid := qu.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
  16. log.Println("区间id:",task_sid,task_eid)
  17. //区间id
  18. q := map[string]interface{}{
  19. "_id": map[string]interface{}{
  20. "$gte": qu.StringTOBsonId(task_sid),
  21. "$lt": qu.StringTOBsonId(task_eid),
  22. },
  23. }
  24. //参数
  25. threadnum:=qu.IntAll(winner_extract["threadnum"])
  26. db_c1:=qu.ObjToString(winner_extract["db_c1"])
  27. db_c2:=qu.ObjToString(winner_extract["db_c2"])
  28. es_index:=qu.ObjToString(winner_extract["es_index"])
  29. es_type:=qu.ObjToString(winner_extract["es_type"])
  30. //mongo
  31. sess := winnermgo.GetMgoConn()
  32. defer mgo.DestoryMongoConn(sess)
  33. //es
  34. EsConn := winner_es.GetEsConn()
  35. defer winner_es.DestoryEsConn(EsConn)
  36. it_1 := sess.DB(winnermgo.DbName).C(db_c1).Find(&q).Sort("_id").Iter()
  37. num_1:=0
  38. pool := make(chan bool, threadnum)
  39. wg := &sync.WaitGroup{}
  40. for tmp := make(map[string]interface{}); it_1.Next(&tmp);num_1++{
  41. if num_1%100 == 0 && num_1>0{
  42. log.Println("当前表:",db_c1,"数量:",num_1)
  43. }
  44. pool <- true
  45. wg.Add(1)
  46. go func(tmp map[string]interface{}) {
  47. defer func() {
  48. <-pool
  49. wg.Done()
  50. }()
  51. savetmp := make(map[string]interface{}, 0)
  52. tmp_id := qu.BsonIdToSId(tmp["_id"])
  53. savetmp["_id"] = tmp_id
  54. savetmp["name"] = tmp["company_name"]
  55. savetmp["pici"] = tmp["updatetime"]
  56. if _, err := EsConn.Index().Index(es_index).Type(es_type).Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
  57. log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
  58. }
  59. }(tmp)
  60. tmp = make(map[string]interface{})
  61. }
  62. wg.Wait()
  63. it_2 := sess.DB(winnermgo.DbName).C(db_c2).Find(&q).Sort("_id").Iter()
  64. num_2:=0
  65. for tmp := make(map[string]interface{}); it_2.Next(&tmp);num_2++{
  66. if num_2%100 == 0 && num_2>0 {
  67. log.Println("当前表:",db_c2,"数量:",num_1)
  68. }
  69. pool <- true
  70. wg.Add(1)
  71. go func(tmp map[string]interface{}) {
  72. defer func() {
  73. <-pool
  74. wg.Done()
  75. }()
  76. savetmp := make(map[string]interface{}, 0)
  77. tmp_id := qu.BsonIdToSId(tmp["_id"])
  78. savetmp["_id"] = tmp_id
  79. savetmp["name"] = tmp["name"]
  80. savetmp["pici"] = tmp["updatetime"]
  81. if _, err := EsConn.Index().Index(es_index).Type(es_type).Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
  82. log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
  83. }
  84. }(tmp)
  85. tmp = make(map[string]interface{})
  86. }
  87. wg.Wait()
  88. log.Println("winnerextract 索引完毕! 总计:",num_1+num_2)
  89. }