buyertask.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson/primitive"
  4. "log"
  5. "sync"
  6. "time"
  7. util "utils"
  8. "utils/mongodb"
  9. )
  10. var fieldArr = []string{"institute_type", "fixedphone", "mobilephone", "latestfixedphone", "latestmobilephone", "province", "city"}
  11. func buyerEsTaskOnce() {
  12. defer util.Catch()
  13. arrEs := []map[string]interface{}{}
  14. buyerEsLock := &sync.Mutex{}
  15. pool := make(chan bool, 3)
  16. wg := &sync.WaitGroup{}
  17. now := time.Now()
  18. preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
  19. curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
  20. task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime))
  21. task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime))
  22. //task_sid = "5e6611b7aec95406dccf714f"
  23. //task_eid = "625c79bf799a3acc48890f48"
  24. log.Println("buyer 区间id:", task_sid, task_eid)
  25. //区间id
  26. q := map[string]interface{}{
  27. "_id": map[string]interface{}{
  28. //"$gte": mongodb.StringTOBsonId(task_sid),
  29. "$lt": mongodb.StringTOBsonId(task_eid),
  30. },
  31. }
  32. //参数
  33. buyerent, _ := standard["buyerent"].(map[string]interface{})
  34. buyer_ent := util.ObjToString(buyerent["collect1"])
  35. //buyer_enterr := qu.ObjToString(buyerent["collect2"])
  36. index, _ := buyerent["index"].(string)
  37. itype, _ := buyerent["type"].(string)
  38. //mongo
  39. sess := standardMgo.GetMgoConn()
  40. defer standardMgo.DestoryMongoConn(sess)
  41. log.Println("q:", q, "db:", standardMgo.DbName, "coll:", buyer_ent)
  42. it_1 := sess.DB(standardMgo.DbName).C(buyer_ent).Find(&q).Select(map[string]interface{}{
  43. "buyer_name": 1,
  44. "institute_type": 1,
  45. "buyerclass": 1,
  46. "fixedphone": 1,
  47. "mobilephone": 1,
  48. "latestfixedphone": 1,
  49. "latestmobilephone": 1,
  50. "province": 1,
  51. "city": 1,
  52. }).Sort("_id").Iter()
  53. num_1 := 0
  54. for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
  55. if num_1%2000 == 0 && num_1 > 0 {
  56. log.Println("当前表:", buyer_ent, "数量:", num_1)
  57. }
  58. pool <- true
  59. wg.Add(1)
  60. go func(tmp map[string]interface{}) {
  61. defer func() {
  62. <-pool
  63. wg.Done()
  64. }()
  65. savetmp := map[string]interface{}{}
  66. _id := mongodb.BsonIdToSId(tmp["_id"])
  67. //if buyerclass, ok := tmp["buyerclass"].([]interface{}); ok && len(buyerclass) > 0 {
  68. // for _, v := range qu.ObjArrToStringArr(buyerclass) {
  69. // if len(buyerclass) >= 2 && v != "其它" {
  70. // savetmp["buyerclass"] = v
  71. // break
  72. // } else if len(buyerclass) == 1 {
  73. // savetmp["buyerclass"] = v
  74. // break
  75. // }
  76. // }
  77. //}
  78. if util.ObjToString(tmp["buyerclass"]) != "" {
  79. savetmp["buyerclass"] = tmp["buyerclass"]
  80. }
  81. savetmp["_id"] = _id
  82. savetmp["name"] = tmp["buyer_name"]
  83. savetmp["buyer_name"] = tmp["buyer_name"]
  84. for _, f := range fieldArr {
  85. if val := util.ObjToString(tmp[f]); val != "" {
  86. savetmp[f] = val
  87. }
  88. }
  89. buyerEsLock.Lock()
  90. arrEs = append(arrEs, savetmp)
  91. if len(arrEs) >= EsBulkSize {
  92. tmps := arrEs
  93. //Es1.BulkSave(index, itype, &tmps, true)
  94. Es2.BulkSave(index, itype, &tmps, true)
  95. arrEs = []map[string]interface{}{}
  96. }
  97. buyerEsLock.Unlock()
  98. }(tmp)
  99. tmp = make(map[string]interface{})
  100. }
  101. // log.Println("q:", q, "db:", mgostandard.DbName, "coll:", buyer_enterr)
  102. // it_2 := sess.DB(mgostandard.DbName).C(buyer_enterr).Find(&q).Sort("_id").Iter()
  103. // num_2 := 0
  104. // for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ {
  105. // if num_2%100 == 0 && num_2 > 0 {
  106. // log.Println("当前表:", buyer_enterr, "数量:", num_2)
  107. // }
  108. // pool <- true
  109. // wg.Add(1)
  110. // go func(tmp map[string]interface{}) {
  111. // defer func() {
  112. // <-pool
  113. // wg.Done()
  114. // }()
  115. // savetmp := map[string]interface{}{}
  116. // tmp_id := mongodb.BsonIdToSId(tmp["_id"])
  117. // savetmp["_id"] = tmp_id
  118. // savetmp["name"] = tmp["name"]
  119. // savetmp["buyer_name"] = tmp["name"]
  120. // if tmp["buyerclass"] != nil {
  121. // savetmp["buyerclass"] = tmp["buyerclass"]
  122. // }
  123. // for _, f := range fieldArr {
  124. // if val := qu.ObjToString(tmp[f]); val != "" {
  125. // savetmp[f] = val
  126. // }
  127. // }
  128. // buyerEsLock.Lock()
  129. // arrEs = append(arrEs, savetmp)
  130. // if len(arrEs) >= BulkSize {
  131. // tmps := arrEs
  132. // elastic.BulkSave(index, itype, &tmps, true)
  133. // arrEs = []map[string]interface{}{}
  134. // }
  135. // buyerEsLock.Unlock()
  136. // }(tmp)
  137. // tmp = make(map[string]interface{})
  138. // }
  139. wg.Wait()
  140. buyerEsLock.Lock()
  141. if len(arrEs) > 0 {
  142. tmps := arrEs
  143. //Es1.BulkSave(index, itype, &tmps, true)
  144. Es2.BulkSave(index, itype, &tmps, true)
  145. arrEs = []map[string]interface{}{}
  146. }
  147. buyerEsLock.Unlock()
  148. log.Println("buyeres 索引完毕! 总计:", num_1)
  149. }