winnerindex.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package main
  2. import (
  3. "log"
  4. "mongodb"
  5. "qfw/util"
  6. elastic "qfw/util/elastic"
  7. "gopkg.in/mgo.v2/bson"
  8. )
  9. func winnerTask(data []byte, mapInfo map[string]interface{}) {
  10. defer util.Catch()
  11. q, _ := mapInfo["query"].(map[string]interface{})
  12. if q == nil {
  13. q = map[string]interface{}{
  14. "_id": bson.M{
  15. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  16. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  17. },
  18. }
  19. }
  20. session := extractmgo.GetMgoConn()
  21. defer extractmgo.DestoryMongoConn(session)
  22. c, _ := winner["collect"].(string)
  23. db, _ := winner["db"].(string)
  24. index, _ := winner["index"].(string)
  25. itype, _ := winner["type"].(string)
  26. count, _ := session.DB(db).C(c).Find(&q).Count()
  27. savepool := make(chan bool, 10)
  28. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  29. query := session.DB(db).C(c).Find(q).Select(bson.M{"pici": 0}).Iter()
  30. arr := make([]map[string]interface{}, savesizei)
  31. var n int
  32. i := 0
  33. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  34. go IS.Add("winner")
  35. arr[i] = tmp
  36. n++
  37. if i == savesizei-1 {
  38. savepool <- true
  39. tmps := arr
  40. go func(tmpn *[]map[string]interface{}) {
  41. defer func() {
  42. <-savepool
  43. }()
  44. elastic.BulkSave(index, itype, tmpn, true)
  45. }(&tmps)
  46. i = 0
  47. arr = make([]map[string]interface{}, savesizei)
  48. }
  49. if n%savesizei == 0 {
  50. log.Println("当前:", n)
  51. }
  52. tmp = make(map[string]interface{})
  53. }
  54. if i > 0 {
  55. elastic.BulkSave(index, itype, &arr, true)
  56. }
  57. log.Println(mapInfo, "create winner index...over", n)
  58. }