default.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson"
  4. "log"
  5. "mongodb"
  6. "qfw/util"
  7. elastic "qfw/util/elastic"
  8. )
  9. func defaultFunc(data []byte, mapInfo map[string]interface{}) {
  10. defer util.Catch()
  11. tasktype, _ := mapInfo["stype"].(string)
  12. if tasktype == "" {
  13. return
  14. }
  15. q, _ := mapInfo["query"].(map[string]interface{})
  16. if q == nil {
  17. q = map[string]interface{}{
  18. "_id": bson.M{
  19. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  20. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  21. },
  22. }
  23. }
  24. var Mgo *mongodb.MongodbSim
  25. c, _ := mapInfo["c"].(string)
  26. db, _ := mapInfo["d"].(string)
  27. index, _ := mapInfo["index"].(string)
  28. itype, _ := mapInfo["type"].(string)
  29. if itype == "" {
  30. itype = index
  31. }
  32. if mapInfo["mgoaddr"] != nil {
  33. Mgo = &mongodb.MongodbSim{
  34. MongodbAddr: mapInfo["mgoaddr"].(string),
  35. Size: 5,
  36. DbName: db,
  37. }
  38. Mgo.InitPool()
  39. } else {
  40. Mgo = mgo
  41. }
  42. session := Mgo.GetMgoConn()
  43. defer Mgo.DestoryMongoConn(session)
  44. count, _ := session.DB(db).C(c).Find(&q).Count()
  45. savepool := make(chan bool, 10)
  46. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  47. query := session.DB(db).C(c).Find(q).Iter()
  48. arr := make([]map[string]interface{}, savesizei)
  49. var n int
  50. i := 0
  51. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  52. go IS.Add(tasktype)
  53. arr[i] = tmp
  54. n++
  55. if i == savesizei-1 {
  56. savepool <- true
  57. tmps := arr
  58. go func(tmpn *[]map[string]interface{}) {
  59. defer func() {
  60. <-savepool
  61. }()
  62. elastic.BulkSave(index, itype, tmpn, true)
  63. }(&tmps)
  64. i = 0
  65. arr = make([]map[string]interface{}, savesizei)
  66. }
  67. if n%savesizei == 0 {
  68. log.Println("当前:", n)
  69. }
  70. tmp = make(map[string]interface{})
  71. }
  72. if i > 0 {
  73. elastic.BulkSave(index, itype, &arr, true)
  74. }
  75. log.Println(mapInfo, "create "+tasktype+" index...over", n)
  76. }