buyerindex.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson"
  4. "log"
  5. "sync"
  6. util "utils"
  7. "utils/elastic"
  8. "utils/mongodb"
  9. )
  10. /*
  11. func buyerTask(data []byte, mapInfo map[string]interface{}) {
  12. defer util.Catch()
  13. q, _ := mapInfo["query"].(map[string]interface{})
  14. if q == nil {
  15. q = map[string]interface{}{
  16. "_id": bson.M{
  17. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  18. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  19. },
  20. }
  21. }
  22. session := extractmgo.GetMgoConn()
  23. defer extractmgo.DestoryMongoConn(session)
  24. c, _ := buyer["collect"].(string)
  25. db, _ := buyer["db"].(string)
  26. index, _ := buyer["index"].(string)
  27. itype, _ := buyer["type"].(string)
  28. count, _ := session.DB(db).C(c).Find(&q).Count()
  29. savepool := make(chan bool, 10)
  30. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  31. query := session.DB(db).C(c).Find(q).Select(map[string]interface{}{
  32. "buyer_name": 1,
  33. "province": 1,
  34. "city": 1,
  35. "district": 1,
  36. }).Iter()
  37. arr := make([]map[string]interface{}, savesizei)
  38. var n int
  39. i := 0
  40. for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
  41. //go IS.Add("buyer")
  42. tmp["name"] = tmp["buyer_name"]
  43. delete(tmp, "buyer_name")
  44. arr[i] = tmp
  45. n++
  46. if i == savesizei-1 {
  47. savepool <- true
  48. tmps := arr
  49. go func(tmpn *[]map[string]interface{}) {
  50. defer func() {
  51. <-savepool
  52. }()
  53. elastic.BulkSave(index, itype, tmpn, true)
  54. }(&tmps)
  55. i = 0
  56. arr = make([]map[string]interface{}, savesizei)
  57. }
  58. if n%savesizei == 0 {
  59. log.Println("当前:", n)
  60. }
  61. tmp = make(map[string]interface{})
  62. }
  63. if i > 0 {
  64. elastic.BulkSave(index, itype, &arr, true)
  65. }
  66. log.Println(mapInfo, "create buyer index...over", n)
  67. }
  68. */
  69. //buyer_err
  70. func buyerTask_err(data []byte, mapInfo map[string]interface{}) {
  71. defer util.Catch()
  72. q, _ := mapInfo["query"].(map[string]interface{})
  73. if q == nil {
  74. q = map[string]interface{}{
  75. "_id": bson.M{
  76. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  77. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  78. },
  79. }
  80. }
  81. //mongo
  82. sess := standardMgo.GetMgoConn()
  83. defer standardMgo.DestoryMongoConn(sess)
  84. c, _ := buyer["collect"].(string)
  85. index, _ := buyer["index"].(string)
  86. itype, _ := buyer["type"].(string)
  87. count, _ := sess.DB(standardMgo.DbName).C(c).Find(&q).Count()
  88. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  89. it := sess.DB(standardMgo.DbName).C(c).Find(&q).Select(map[string]interface{}{
  90. "name": 1,
  91. "buyer_name": 1,
  92. "institute_type": 1,
  93. "buyerclass": 1,
  94. "fixedphone": 1,
  95. "mobilephone": 1,
  96. "latestfixedphone": 1,
  97. "latestmobilephone": 1,
  98. "province": 1,
  99. "city": 1,
  100. }).Sort("_id").Iter()
  101. arrEs := []map[string]interface{}{}
  102. buyerEsLock := &sync.Mutex{}
  103. pool := make(chan bool, 3)
  104. wg := &sync.WaitGroup{}
  105. i := 0
  106. for tmp := make(map[string]interface{}); it.Next(tmp); i = i + 1 {
  107. if i%1000 == 0 {
  108. log.Println("current:", i)
  109. }
  110. pool <- true
  111. wg.Add(1)
  112. go func(tmp map[string]interface{}) {
  113. defer func() {
  114. <-pool
  115. wg.Done()
  116. }()
  117. util.Debug(tmp)
  118. savetmp := map[string]interface{}{}
  119. tmp_id := util.BsonIdToSId(tmp["_id"])
  120. savetmp["_id"] = tmp_id
  121. savetmp["name"] = tmp["name"]
  122. savetmp["buyer_name"] = tmp["name"]
  123. util.Debug(tmp["buyerclass"])
  124. if tmp["buyerclass"] != nil {
  125. savetmp["buyerclass"] = tmp["buyerclass"]
  126. }
  127. for _, f := range fieldArr {
  128. if val := util.ObjToString(tmp[f]); val != "" {
  129. savetmp[f] = val
  130. }
  131. }
  132. util.Debug(savetmp)
  133. buyerEsLock.Lock()
  134. arrEs = append(arrEs, savetmp)
  135. if len(arrEs) >= MgoBulkSize {
  136. tmps := arrEs
  137. elastic.BulkSave(index, itype, &tmps, true)
  138. arrEs = []map[string]interface{}{}
  139. }
  140. buyerEsLock.Unlock()
  141. }(tmp)
  142. tmp = make(map[string]interface{})
  143. }
  144. wg.Wait()
  145. buyerEsLock.Lock()
  146. if len(arrEs) > 0 {
  147. tmps := arrEs
  148. elastic.BulkSave(index, itype, &tmps, true)
  149. arrEs = []map[string]interface{}{}
  150. }
  151. buyerEsLock.Unlock()
  152. log.Println(mapInfo, "create buyer index...over", i)
  153. }
  154. //buyer_enterprise
  155. func buyerTask(data []byte, mapInfo map[string]interface{}) {
  156. defer util.Catch()
  157. q, _ := mapInfo["query"].(map[string]interface{})
  158. if q == nil {
  159. q = map[string]interface{}{
  160. "_id": bson.M{
  161. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  162. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  163. },
  164. }
  165. }
  166. //mongo
  167. sess := standardMgo.GetMgoConn()
  168. defer standardMgo.DestoryMongoConn(sess)
  169. c, _ := buyer["collect"].(string)
  170. index, _ := buyer["index"].(string)
  171. itype, _ := buyer["type"].(string)
  172. count, _ := sess.DB(standardMgo.DbName).C(c).Find(&q).Count()
  173. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  174. it := sess.DB(standardMgo.DbName).C(c).Find(&q).Select(map[string]interface{}{
  175. "buyer_name": 1,
  176. "institute_type": 1,
  177. "buyerclass": 1,
  178. "fixedphone": 1,
  179. "mobilephone": 1,
  180. "latestfixedphone": 1,
  181. "latestmobilephone": 1,
  182. "province": 1,
  183. "city": 1,
  184. }).Sort("_id").Iter()
  185. arrEs := []map[string]interface{}{}
  186. buyerEsLock := &sync.Mutex{}
  187. pool := make(chan bool, 3)
  188. wg := &sync.WaitGroup{}
  189. i := 0
  190. for tmp := make(map[string]interface{}); it.Next(tmp); i = i + 1 {
  191. if i%1000 == 0 {
  192. log.Println("current:", i)
  193. }
  194. pool <- true
  195. wg.Add(1)
  196. go func(tmp map[string]interface{}) {
  197. defer func() {
  198. <-pool
  199. wg.Done()
  200. }()
  201. savetmp := map[string]interface{}{}
  202. _id := util.BsonIdToSId(tmp["_id"])
  203. //if buyerclass, ok := tmp["buyerclass"].([]interface{}); ok && len(buyerclass) > 0 {
  204. // for _, v := range util.ObjArrToStringArr(buyerclass) {
  205. // if len(buyerclass) >= 2 && v != "其它" {
  206. // savetmp["buyerclass"] = v
  207. // break
  208. // } else if len(buyerclass) == 1 {
  209. // savetmp["buyerclass"] = v
  210. // break
  211. // }
  212. // }
  213. //}
  214. if util.ObjToString(tmp["buyerclass"]) != "" {
  215. savetmp["buyerclass"] = tmp["buyerclass"]
  216. }
  217. savetmp["_id"] = _id
  218. savetmp["name"] = tmp["buyer_name"]
  219. savetmp["buyer_name"] = tmp["buyer_name"]
  220. for _, f := range fieldArr {
  221. if val := util.ObjToString(tmp[f]); val != "" {
  222. savetmp[f] = val
  223. }
  224. }
  225. buyerEsLock.Lock()
  226. arrEs = append(arrEs, savetmp)
  227. if len(arrEs) >= EsBulkSize {
  228. tmps := arrEs
  229. elastic.BulkSave(index, itype, &tmps, true)
  230. arrEs = []map[string]interface{}{}
  231. }
  232. buyerEsLock.Unlock()
  233. }(tmp)
  234. tmp = make(map[string]interface{})
  235. }
  236. wg.Wait()
  237. buyerEsLock.Lock()
  238. if len(arrEs) > 0 {
  239. tmps := arrEs
  240. elastic.BulkSave(index, itype, &tmps, true)
  241. arrEs = []map[string]interface{}{}
  242. }
  243. buyerEsLock.Unlock()
  244. log.Println(mapInfo, "create buyer index...over", i)
  245. }