buyerindex.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package main
  2. import (
  3. "log"
  4. "qfw/util"
  5. elastic "qfw/util/elastic"
  6. "gopkg.in/mgo.v2/bson"
  7. )
  8. func buyerTask(data []byte, mapInfo map[string]interface{}) {
  9. defer util.Catch()
  10. q, _ := mapInfo["query"].(map[string]interface{})
  11. if q == nil {
  12. q = map[string]interface{}{
  13. "_id": bson.M{
  14. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  15. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  16. },
  17. }
  18. }
  19. session := extractmgo.GetMgoConn()
  20. defer extractmgo.DestoryMongoConn(session)
  21. c, _ := buyer["collect"].(string)
  22. db, _ := buyer["db"].(string)
  23. index, _ := buyer["index"].(string)
  24. itype, _ := buyer["type"].(string)
  25. count, _ := session.DB(db).C(c).Find(&q).Count()
  26. savepool := make(chan bool, 10)
  27. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  28. query := session.DB(db).C(c).Find(q).Select(map[string]interface{}{
  29. "buyer_name": 1,
  30. "province": 1,
  31. "city": 1,
  32. "district": 1,
  33. }).Iter()
  34. arr := make([]map[string]interface{}, savesizei)
  35. var n int
  36. i := 0
  37. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  38. //go IS.Add("buyer")
  39. tmp["name"] = tmp["buyer_name"]
  40. delete(tmp, "buyer_name")
  41. arr[i] = tmp
  42. n++
  43. if i == savesizei-1 {
  44. savepool <- true
  45. tmps := arr
  46. go func(tmpn *[]map[string]interface{}) {
  47. defer func() {
  48. <-savepool
  49. }()
  50. elastic.BulkSave(index, itype, tmpn, true)
  51. }(&tmps)
  52. i = 0
  53. arr = make([]map[string]interface{}, savesizei)
  54. }
  55. if n%savesizei == 0 {
  56. log.Println("当前:", n)
  57. }
  58. tmp = make(map[string]interface{})
  59. }
  60. if i > 0 {
  61. elastic.BulkSave(index, itype, &arr, true)
  62. }
  63. log.Println(mapInfo, "create buyer index...over", n)
  64. }