winnerenterpriseindex.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package main
  2. import (
  3. "log"
  4. "qfw/util"
  5. elastic "qfw/util/elastic"
  6. "sync"
  7. "gopkg.in/mgo.v2/bson"
  8. )
  9. func winnerEnterPriseTask(data []byte, mapInfo map[string]interface{}) {
  10. defer util.Catch()
  11. q, _ := mapInfo["query"].(map[string]interface{})
  12. if q == nil {
  13. q = map[string]interface{}{
  14. "_id": bson.M{
  15. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  16. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  17. },
  18. }
  19. }
  20. log.Println("++++++++++++++++++++++")
  21. session := winnerentermgo.GetMgoConn(1800)
  22. defer winnerentermgo.DestoryMongoConn(session)
  23. c, _ := winnerenterprise["collect"].(string)
  24. db, _ := winnerenterprise["db"].(string)
  25. index, _ := winnerenterprise["index"].(string)
  26. itype, _ := winnerenterprise["type"].(string)
  27. log.Println("index===", index, "itype===", itype)
  28. count, _ := session.DB(db).C(c).Find(&q).Count()
  29. savepool := make(chan bool, 10)
  30. UpdatesLock := sync.Mutex{}
  31. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  32. query := session.DB(db).C(c).Find(q).Select(bson.M{"alias": 0, "tmp_id": 0}).Iter()
  33. tmp := []map[string]interface{}{}
  34. tmp = append(tmp, map[string]interface{}{
  35. "test": "test",
  36. })
  37. elastic.BulkSave(index, itype, &tmp, true)
  38. arrEs := []map[string]interface{}{}
  39. var n int
  40. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  41. //go IS.Add("winner")
  42. log.Println("tmp=========", tmp)
  43. UpdatesLock.Lock()
  44. arrEs = append(arrEs, tmp)
  45. if len(arrEs) > savesizei {
  46. tmps := arrEs
  47. savepool <- true
  48. go func(tmpn []map[string]interface{}) {
  49. defer func() {
  50. <-savepool
  51. }()
  52. elastic.BulkSave(index, itype, &tmpn, true)
  53. }(tmps)
  54. arrEs = []map[string]interface{}{}
  55. }
  56. UpdatesLock.Unlock()
  57. if n%1000 == 0 {
  58. log.Println("current:", n, util.BsonIdToSId(tmp["_id"]))
  59. }
  60. tmp = make(map[string]interface{})
  61. }
  62. UpdatesLock.Lock()
  63. if len(arrEs) > 0 {
  64. tmpn := arrEs
  65. elastic.BulkSave(index, itype, &tmpn, true)
  66. }
  67. UpdatesLock.Unlock()
  68. log.Println(mapInfo, "create winner_enterprise index...over", n)
  69. }