buyertask.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package main
  2. import (
  3. "log"
  4. qu "qfw/util"
  5. elastic "qfw/util/elastic"
  6. "sync"
  7. "time"
  8. "gopkg.in/mgo.v2/bson"
  9. )
  10. var fieldArr = []string{"institute_type", "fixedphone", "mobilephone", "latestfixedphone", "latestmobilephone", "province", "city"}
  11. func buyerEsTaskOnce() {
  12. defer qu.Catch()
  13. arrEs := []map[string]interface{}{}
  14. buyerEsLock := &sync.Mutex{}
  15. pool := make(chan bool, 3)
  16. wg := &sync.WaitGroup{}
  17. now := time.Now()
  18. preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
  19. curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
  20. task_sid := qu.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
  21. task_eid := qu.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
  22. log.Println("buyer 区间id:", task_sid, task_eid)
  23. // task_sid = "5e6611b7aec95406dccf7151"
  24. // task_eid = "5f7249164bdc0447a6c90fa5"
  25. //区间id
  26. q := map[string]interface{}{
  27. "_id": map[string]interface{}{
  28. "$gte": qu.StringTOBsonId(task_sid),
  29. "$lt": qu.StringTOBsonId(task_eid),
  30. },
  31. }
  32. //参数
  33. buyerent, _ := standard["buyerent"].(map[string]interface{})
  34. buyer_ent := qu.ObjToString(buyerent["collect1"])
  35. //buyer_enterr := qu.ObjToString(buyerent["collect2"])
  36. index, _ := buyerent["index"].(string)
  37. itype, _ := buyerent["type"].(string)
  38. //mongo
  39. sess := mgostandard.GetMgoConn()
  40. defer mgostandard.DestoryMongoConn(sess)
  41. log.Println("q:", q, "db:", mgostandard.DbName, "coll:", buyer_ent)
  42. it_1 := sess.DB(mgostandard.DbName).C(buyer_ent).Find(&q).Select(map[string]interface{}{
  43. "buyer_name": 1,
  44. "institute_type": 1,
  45. "buyerclass": 1,
  46. "fixedphone": 1,
  47. "mobilephone": 1,
  48. "latestfixedphone": 1,
  49. "latestmobilephone": 1,
  50. "province": 1,
  51. "city": 1,
  52. }).Sort("_id").Iter()
  53. num_1 := 0
  54. for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
  55. if num_1%100 == 0 && num_1 > 0 {
  56. log.Println("当前表:", buyer_ent, "数量:", num_1)
  57. }
  58. pool <- true
  59. wg.Add(1)
  60. go func(tmp map[string]interface{}) {
  61. defer func() {
  62. <-pool
  63. wg.Done()
  64. }()
  65. savetmp := map[string]interface{}{}
  66. _id := qu.BsonIdToSId(tmp["_id"])
  67. if buyerclass, ok := tmp["buyerclass"].([]interface{}); ok && len(buyerclass) > 0 {
  68. for _, v := range qu.ObjArrToStringArr(buyerclass) {
  69. if len(buyerclass) >= 2 && v != "其它" {
  70. savetmp["buyerclass"] = v
  71. break
  72. } else if len(buyerclass) == 1 {
  73. savetmp["buyerclass"] = v
  74. break
  75. }
  76. }
  77. }
  78. savetmp["_id"] = _id
  79. savetmp["name"] = tmp["buyer_name"]
  80. savetmp["buyer_name"] = tmp["buyer_name"]
  81. for _, f := range fieldArr {
  82. if val := qu.ObjToString(tmp[f]); val != "" {
  83. savetmp[f] = val
  84. }
  85. }
  86. buyerEsLock.Lock()
  87. arrEs = append(arrEs, savetmp)
  88. if len(arrEs) >= BulkSize {
  89. tmps := arrEs
  90. elastic.BulkSave(index, itype, &tmps, true)
  91. arrEs = []map[string]interface{}{}
  92. }
  93. buyerEsLock.Unlock()
  94. }(tmp)
  95. tmp = make(map[string]interface{})
  96. }
  97. // log.Println("q:", q, "db:", mgostandard.DbName, "coll:", buyer_enterr)
  98. // it_2 := sess.DB(mgostandard.DbName).C(buyer_enterr).Find(&q).Sort("_id").Iter()
  99. // num_2 := 0
  100. // for tmp := make(map[string]interface{}); it_2.Next(&tmp); num_2++ {
  101. // if num_2%100 == 0 && num_2 > 0 {
  102. // log.Println("当前表:", buyer_enterr, "数量:", num_2)
  103. // }
  104. // pool <- true
  105. // wg.Add(1)
  106. // go func(tmp map[string]interface{}) {
  107. // defer func() {
  108. // <-pool
  109. // wg.Done()
  110. // }()
  111. // savetmp := map[string]interface{}{}
  112. // tmp_id := qu.BsonIdToSId(tmp["_id"])
  113. // savetmp["_id"] = tmp_id
  114. // savetmp["name"] = tmp["name"]
  115. // savetmp["buyer_name"] = tmp["name"]
  116. // if tmp["buyerclass"] != nil {
  117. // savetmp["buyerclass"] = tmp["buyerclass"]
  118. // }
  119. // for _, f := range fieldArr {
  120. // if val := qu.ObjToString(tmp[f]); val != "" {
  121. // savetmp[f] = val
  122. // }
  123. // }
  124. // buyerEsLock.Lock()
  125. // arrEs = append(arrEs, savetmp)
  126. // if len(arrEs) >= BulkSize {
  127. // tmps := arrEs
  128. // elastic.BulkSave(index, itype, &tmps, true)
  129. // arrEs = []map[string]interface{}{}
  130. // }
  131. // buyerEsLock.Unlock()
  132. // }(tmp)
  133. // tmp = make(map[string]interface{})
  134. // }
  135. wg.Wait()
  136. buyerEsLock.Lock()
  137. if len(arrEs) > 0 {
  138. tmps := arrEs
  139. elastic.BulkSave(index, itype, &tmps, true)
  140. arrEs = []map[string]interface{}{}
  141. }
  142. buyerEsLock.Unlock()
  143. log.Println("buyeres 索引完毕! 总计:", num_1)
  144. }