task.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package main
  2. import (
  3. "github.com/cron"
  4. "go.mongodb.org/mongo-driver/bson"
  5. "log"
  6. "mongodb"
  7. "qfw/util"
  8. "reflect"
  9. "sync"
  10. "time"
  11. )
  12. // 定时任务
  13. func TimeTask() {
  14. //go SaveAdd()
  15. c := cron.New()
  16. cronstr := "0 0 2 * * ?" //每天2点执行
  17. //cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次
  18. err := c.AddFunc(cronstr, func() { SaveAdd() })
  19. if err != nil {
  20. util.Debug(err)
  21. return
  22. }
  23. c.Start()
  24. }
  25. // SaveAdd 增量数据
  26. func SaveAdd() {
  27. defer util.Catch()
  28. sess := Mgo.GetMgoConn()
  29. defer Mgo.DestoryMongoConn(sess)
  30. pool := make(chan bool, 5)
  31. wg := &sync.WaitGroup{}
  32. //if UpdateId == "" {
  33. // util.Debug("update id err...")
  34. // return
  35. //}
  36. //q := bson.M{"_id": mongodb.StringTOBsonId("647e30b3eabd4b61e72b3f8f")}
  37. q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(UpdateId)}}
  38. util.Debug("q ---", q)
  39. it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
  40. count := 0
  41. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  42. if count%5000 == 0 {
  43. log.Println("current:", count)
  44. }
  45. if UpdateId < mongodb.BsonIdToSId(tmp["_id"]) {
  46. UpdateId = mongodb.BsonIdToSId(tmp["_id"])
  47. }
  48. pool <- true
  49. wg.Add(1)
  50. go func(tmp map[string]interface{}) {
  51. defer func() {
  52. <-pool
  53. wg.Done()
  54. }()
  55. esMap := map[string]interface{}{}
  56. //生索引字段处理
  57. for _, field := range EsFields {
  58. if tmp[field] == nil {
  59. continue
  60. }
  61. if field == "_id" {
  62. esMap["_id"] = mongodb.BsonIdToSId(tmp["_id"])
  63. esMap["id"] = mongodb.BsonIdToSId(tmp["_id"])
  64. } else if field == "buyerclass" {
  65. if reflect.TypeOf(tmp["buyerclass"]).String() == "[]interface {}" {
  66. esMap["buyerclass"] = util.ObjArrToStringArr(tmp["buyerclass"].([]interface{}))[0]
  67. } else {
  68. esMap["buyerclass"] = tmp["buyerclass"]
  69. }
  70. } else {
  71. esMap[field] = tmp[field]
  72. }
  73. }
  74. // 处理result
  75. if mp, ok := tmp["results"].([]interface{}); ok {
  76. var mpArr []map[string]interface{}
  77. for _, v := range mp {
  78. v1 := v.(map[string]interface{})
  79. tmp := make(map[string]interface{})
  80. if v1["purchasing"] != nil {
  81. tmp["purchasing"] = v1["purchasing"]
  82. }
  83. if v1["p_projects"] != nil {
  84. tmp["p_projects"] = v1["p_projects"]
  85. }
  86. mpArr = append(mpArr, tmp)
  87. }
  88. if len(mpArr) > 0 {
  89. esMap["results"] = mpArr
  90. }
  91. }
  92. EsSaveCache <- esMap
  93. }(tmp)
  94. tmp = make(map[string]interface{})
  95. }
  96. wg.Wait()
  97. log.Println("Run Over...Count:", count)
  98. }
  99. // SaveAll 存量数据生es
  100. func SaveAll() {
  101. defer util.Catch()
  102. sess := Mgo.GetMgoConn()
  103. defer Mgo.DestoryMongoConn(sess)
  104. pool := make(chan bool, 10)
  105. wg := &sync.WaitGroup{}
  106. //q := bson.M{"_id": mongodb.StringTOBsonId("6227a7b18f0c45f21aabe120")}
  107. it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter()
  108. count := 0
  109. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  110. if count%20000 == 0 {
  111. log.Println("current:", count, tmp["_id"])
  112. }
  113. pool <- true
  114. wg.Add(1)
  115. go func(tmp map[string]interface{}) {
  116. defer func() {
  117. <-pool
  118. wg.Done()
  119. }()
  120. esMap := map[string]interface{}{}
  121. //生索引字段处理
  122. for _, field := range EsFields {
  123. if tmp[field] == nil {
  124. continue
  125. }
  126. if field == "_id" {
  127. esMap["_id"] = mongodb.BsonIdToSId(tmp["_id"])
  128. esMap["id"] = mongodb.BsonIdToSId(tmp["_id"])
  129. } else if field == "buyerclass" {
  130. if reflect.TypeOf(tmp["buyerclass"]).String() == "[]interface {}" {
  131. esMap["buyerclass"] = util.ObjArrToStringArr(tmp["buyerclass"].([]interface{}))[0]
  132. } else {
  133. esMap["buyerclass"] = tmp["buyerclass"]
  134. }
  135. } else {
  136. esMap[field] = tmp[field]
  137. }
  138. }
  139. // 处理result
  140. if mp, ok := tmp["results"].([]interface{}); ok {
  141. var mpArr []map[string]interface{}
  142. for _, v := range mp {
  143. v1 := v.(map[string]interface{})
  144. tmp := make(map[string]interface{})
  145. if v1["purchasing"] != nil {
  146. tmp["purchasing"] = v1["purchasing"]
  147. }
  148. if v1["p_projects"] != nil {
  149. tmp["p_projects"] = v1["p_projects"]
  150. }
  151. mpArr = append(mpArr, tmp)
  152. }
  153. if len(mpArr) > 0 {
  154. esMap["results"] = mpArr
  155. }
  156. }
  157. EsSaveCache <- esMap
  158. }(tmp)
  159. tmp = make(map[string]interface{})
  160. }
  161. wg.Wait()
  162. log.Println("Run Over...Count:", count)
  163. }
  164. // SaveEs 过滤后数据存库
  165. func SaveEs() {
  166. log.Println("Es Save...")
  167. arru := make([]map[string]interface{}, 100)
  168. indexu := 0
  169. for {
  170. select {
  171. case v := <-EsSaveCache:
  172. arru[indexu] = v
  173. indexu++
  174. if indexu == 100 {
  175. SP <- true
  176. go func(arru []map[string]interface{}) {
  177. defer func() {
  178. <-SP
  179. }()
  180. Es1.BulkSave(Index, arru)
  181. }(arru)
  182. arru = make([]map[string]interface{}, 100)
  183. indexu = 0
  184. }
  185. case <-time.After(1000 * time.Millisecond):
  186. if indexu > 0 {
  187. SP <- true
  188. go func(arru []map[string]interface{}) {
  189. defer func() {
  190. <-SP
  191. }()
  192. Es1.BulkSave(Index, arru)
  193. }(arru[:indexu])
  194. arru = make([]map[string]interface{}, 100)
  195. indexu = 0
  196. }
  197. }
  198. }
  199. }