standardata.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package main
  2. import (
  3. "log"
  4. "qfw/util"
  5. elastic "qfw/util/elastic"
  6. "gopkg.in/mgo.v2/bson"
  7. )
  8. func standardTask(stype string, mapInfo map[string]interface{}) {
  9. defer util.Catch()
  10. q, _ := mapInfo["query"].(map[string]interface{})
  11. if q == nil {
  12. q = map[string]interface{}{
  13. "_id": bson.M{
  14. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  15. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  16. },
  17. }
  18. }
  19. switch stype {
  20. case "winnerent":
  21. winnerEnt(q)
  22. case "buyerent":
  23. buyerEnt(q)
  24. case "agencyent":
  25. agencyEnt(q)
  26. }
  27. }
  28. //winnerent
  29. func winnerEnt(q map[string]interface{}) {
  30. session := mgostandard.GetMgoConn(3600)
  31. defer mgostandard.DestoryMongoConn(session)
  32. db, _ := standard["db"].(string)
  33. winnerent, _ := standard["winnerent"].(map[string]interface{})
  34. c, _ := winnerent["collect"].(string)
  35. index, _ := winnerent["index"].(string)
  36. itype, _ := winnerent["type"].(string)
  37. count, _ := session.DB(db).C(c).Find(&q).Count()
  38. savepool := make(chan bool, 10)
  39. log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  40. query := session.DB(db).C(c).Find(q).Iter()
  41. arr := make([]map[string]interface{}, savesizei)
  42. var n int
  43. i := 0
  44. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  45. //不生索引字段
  46. delete(tmp, "partners")
  47. delete(tmp, "wechat_accounts")
  48. delete(tmp, "tmp_id")
  49. tmp["company"] = tmp["company_name"]
  50. arr[i] = tmp
  51. n++
  52. if i == savesizei-1 {
  53. savepool <- true
  54. tmps := arr
  55. go func(tmpn *[]map[string]interface{}) {
  56. defer func() {
  57. <-savepool
  58. }()
  59. elastic.BulkSave(index, itype, tmpn, true)
  60. }(&tmps)
  61. i = 0
  62. arr = make([]map[string]interface{}, savesizei)
  63. }
  64. if n%savesizei == 0 {
  65. log.Println("当前:", n)
  66. }
  67. tmp = make(map[string]interface{})
  68. }
  69. if i > 0 {
  70. elastic.BulkSave(index, itype, &arr, true)
  71. }
  72. log.Println("create winnerent index...over", n)
  73. }
  74. //buyerent
  75. func buyerEnt(q map[string]interface{}) {
  76. session := mgostandard.GetMgoConn(3600)
  77. defer mgostandard.DestoryMongoConn(session)
  78. db, _ := standard["db"].(string)
  79. buyerent, _ := standard["buyerent"].(map[string]interface{})
  80. c, _ := buyerent["collect"].(string)
  81. index, _ := buyerent["index"].(string)
  82. itype, _ := buyerent["type"].(string)
  83. count, _ := session.DB(db).C(c).Find(&q).Count()
  84. savepool := make(chan bool, 10)
  85. log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  86. query := session.DB(db).C(c).Find(q).Iter()
  87. arr := make([]map[string]interface{}, savesizei)
  88. var n int
  89. i := 0
  90. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  91. //不生索引字段
  92. delete(tmp, "partners")
  93. delete(tmp, "wechat_accounts")
  94. delete(tmp, "tmp_id")
  95. tmp["buyer"] = tmp["buyer_name"]
  96. arr[i] = tmp
  97. n++
  98. if i == savesizei-1 {
  99. savepool <- true
  100. tmps := arr
  101. go func(tmpn *[]map[string]interface{}) {
  102. defer func() {
  103. <-savepool
  104. }()
  105. elastic.BulkSave(index, itype, tmpn, true)
  106. }(&tmps)
  107. i = 0
  108. arr = make([]map[string]interface{}, savesizei)
  109. }
  110. if n%savesizei == 0 {
  111. log.Println("当前:", n)
  112. }
  113. tmp = make(map[string]interface{})
  114. }
  115. if i > 0 {
  116. elastic.BulkSave(index, itype, &arr, true)
  117. }
  118. log.Println("create buyerent index...over", n)
  119. }
  120. //agencyent
  121. func agencyEnt(q map[string]interface{}) {
  122. session := mgostandard.GetMgoConn(3600)
  123. defer mgostandard.DestoryMongoConn(session)
  124. db, _ := standard["db"].(string)
  125. agencyent, _ := standard["agencyent"].(map[string]interface{})
  126. c, _ := agencyent["collect"].(string)
  127. index, _ := agencyent["index"].(string)
  128. itype, _ := agencyent["type"].(string)
  129. count, _ := session.DB(db).C(c).Find(&q).Count()
  130. savepool := make(chan bool, 10)
  131. log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
  132. query := session.DB(db).C(c).Find(q).Iter()
  133. arr := make([]map[string]interface{}, savesizei)
  134. var n int
  135. i := 0
  136. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  137. //不生索引字段
  138. delete(tmp, "partners")
  139. delete(tmp, "wechat_accounts")
  140. delete(tmp, "tmp_id")
  141. tmp["agency"] = tmp["agency_name"]
  142. arr[i] = tmp
  143. n++
  144. if i == savesizei-1 {
  145. savepool <- true
  146. tmps := arr
  147. go func(tmpn *[]map[string]interface{}) {
  148. defer func() {
  149. <-savepool
  150. }()
  151. elastic.BulkSave(index, itype, tmpn, true)
  152. }(&tmps)
  153. i = 0
  154. arr = make([]map[string]interface{}, savesizei)
  155. }
  156. if n%savesizei == 0 {
  157. log.Println("当前:", n)
  158. }
  159. tmp = make(map[string]interface{})
  160. }
  161. if i > 0 {
  162. elastic.BulkSave(index, itype, &arr, true)
  163. }
  164. log.Println("create agencyent index...over", n)
  165. }