standardata.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson"
  4. "log"
  5. util "utils"
  6. "utils/elastic"
  7. "utils/mongodb"
  8. )
  9. func standardTask(stype string, mapInfo map[string]interface{}) {
  10. defer util.Catch()
  11. q, _ := mapInfo["query"].(map[string]interface{})
  12. if q == nil {
  13. q = map[string]interface{}{
  14. "_id": bson.M{
  15. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  16. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  17. },
  18. }
  19. }
  20. switch stype {
  21. case "winnerent":
  22. winnerEnt(q)
  23. case "buyerent":
  24. buyerEnt(q)
  25. case "agencyent":
  26. agencyEnt(q)
  27. }
  28. }
  29. //winnerent
  30. func winnerEnt(q map[string]interface{}) {
  31. session := standardMgo.GetMgoConn()
  32. defer standardMgo.DestoryMongoConn(session)
  33. winnerent, _ := standard["winnerent"].(map[string]interface{})
  34. c, _ := winnerent["collect"].(string)
  35. index, _ := winnerent["index"].(string)
  36. itype, _ := winnerent["type"].(string)
  37. count, _ := session.DB(standardMgo.DbName).C(c).Find(&q).Count()
  38. savepool := make(chan bool, 10)
  39. log.Println(standardMgo.DbName, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  40. query := session.DB(standardMgo.DbName).C(c).Find(q).Iter()
  41. arr := make([]map[string]interface{}, EsBulkSize)
  42. var n int
  43. i := 0
  44. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  45. //不生索引字段
  46. delete(tmp, "partners")
  47. delete(tmp, "wechat_accounts")
  48. delete(tmp, "tmp_id")
  49. tmp["company"] = tmp["company_name"]
  50. arr[i] = tmp
  51. n++
  52. if i == EsBulkSize-1 {
  53. savepool <- true
  54. tmps := arr
  55. go func(tmpn *[]map[string]interface{}) {
  56. defer func() {
  57. <-savepool
  58. }()
  59. elastic.BulkSave(index, itype, tmpn, true)
  60. }(&tmps)
  61. i = 0
  62. arr = make([]map[string]interface{}, EsBulkSize)
  63. }
  64. if n%EsBulkSize == 0 {
  65. log.Println("当前:", n)
  66. }
  67. tmp = make(map[string]interface{})
  68. }
  69. if i > 0 {
  70. elastic.BulkSave(index, itype, &arr, true)
  71. }
  72. log.Println("create winnerent index...over", n)
  73. }
  74. //buyerent
  75. func buyerEnt(q map[string]interface{}) {
  76. session := standardMgo.GetMgoConn()
  77. defer standardMgo.DestoryMongoConn(session)
  78. buyerent, _ := standard["buyerent"].(map[string]interface{})
  79. c, _ := buyerent["collect"].(string)
  80. index, _ := buyerent["index"].(string)
  81. itype, _ := buyerent["type"].(string)
  82. count, _ := session.DB(standardMgo.DbName).C(c).Find(&q).Count()
  83. savepool := make(chan bool, 10)
  84. log.Println(standardMgo.DbName, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  85. query := session.DB(standardMgo.DbName).C(c).Find(q).Iter()
  86. arr := make([]map[string]interface{}, EsBulkSize)
  87. var n int
  88. i := 0
  89. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  90. //不生索引字段
  91. delete(tmp, "partners")
  92. delete(tmp, "wechat_accounts")
  93. delete(tmp, "tmp_id")
  94. tmp["buyer"] = tmp["buyer_name"]
  95. arr[i] = tmp
  96. n++
  97. if i == EsBulkSize-1 {
  98. savepool <- true
  99. tmps := arr
  100. go func(tmpn *[]map[string]interface{}) {
  101. defer func() {
  102. <-savepool
  103. }()
  104. elastic.BulkSave(index, itype, tmpn, true)
  105. }(&tmps)
  106. i = 0
  107. arr = make([]map[string]interface{}, EsBulkSize)
  108. }
  109. if n%EsBulkSize == 0 {
  110. log.Println("当前:", n)
  111. }
  112. tmp = make(map[string]interface{})
  113. }
  114. if i > 0 {
  115. elastic.BulkSave(index, itype, &arr, true)
  116. }
  117. log.Println("create buyerent index...over", n)
  118. }
  119. //agencyent
  120. func agencyEnt(q map[string]interface{}) {
  121. session := standardMgo.GetMgoConn()
  122. defer standardMgo.DestoryMongoConn(session)
  123. agencyent, _ := standard["agencyent"].(map[string]interface{})
  124. c, _ := agencyent["collect"].(string)
  125. index, _ := agencyent["index"].(string)
  126. itype, _ := agencyent["type"].(string)
  127. count, _ := session.DB(standardMgo.DbName).C(c).Find(&q).Count()
  128. savepool := make(chan bool, 10)
  129. log.Println(standardMgo.DbName, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  130. query := session.DB(standardMgo.DbName).C(c).Find(q).Iter()
  131. arr := make([]map[string]interface{}, EsBulkSize)
  132. var n int
  133. i := 0
  134. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  135. //不生索引字段
  136. delete(tmp, "partners")
  137. delete(tmp, "wechat_accounts")
  138. delete(tmp, "tmp_id")
  139. tmp["agency"] = tmp["agency_name"]
  140. arr[i] = tmp
  141. n++
  142. if i == EsBulkSize-1 {
  143. savepool <- true
  144. tmps := arr
  145. go func(tmpn *[]map[string]interface{}) {
  146. defer func() {
  147. <-savepool
  148. }()
  149. elastic.BulkSave(index, itype, tmpn, true)
  150. }(&tmps)
  151. i = 0
  152. arr = make([]map[string]interface{}, EsBulkSize)
  153. }
  154. if n%EsBulkSize == 0 {
  155. log.Println("当前:", n)
  156. }
  157. tmp = make(map[string]interface{})
  158. }
  159. if i > 0 {
  160. elastic.BulkSave(index, itype, &arr, true)
  161. }
  162. log.Println("create agencyent index...over", n)
  163. }