task.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package main
  2. import (
  3. "github.com/cron"
  4. "go.mongodb.org/mongo-driver/bson"
  5. "log"
  6. "mongodb"
  7. "qfw/util"
  8. "reflect"
  9. "sync"
  10. "time"
  11. )
  12. //定时任务
  13. func TimeTask() {
  14. //go SaveAdd()
  15. c := cron.New()
  16. cronstr := "0 0 2 * * ?" //每天2点执行
  17. //cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次
  18. err := c.AddFunc(cronstr, func() { SaveAdd() })
  19. if err != nil {
  20. util.Debug(err)
  21. return
  22. }
  23. c.Start()
  24. }
  25. // SaveAdd 增量数据
  26. func SaveAdd() {
  27. defer util.Catch()
  28. sess := Mgo.GetMgoConn()
  29. defer Mgo.DestoryMongoConn(sess)
  30. pool := make(chan bool, 5)
  31. wg := &sync.WaitGroup{}
  32. if UpdateId == "" {
  33. util.Debug("update id err...")
  34. return
  35. }
  36. q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(UpdateId)}}
  37. util.Debug("q ---", q)
  38. it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
  39. count := 0
  40. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  41. if count%5000 == 0 {
  42. log.Println("current:", count)
  43. }
  44. if UpdateId < mongodb.BsonIdToSId(tmp["_id"]) {
  45. UpdateId = mongodb.BsonIdToSId(tmp["_id"])
  46. }
  47. pool <- true
  48. wg.Add(1)
  49. go func(tmp map[string]interface{}) {
  50. defer func() {
  51. <-pool
  52. wg.Done()
  53. }()
  54. esMap := map[string]interface{}{}
  55. //生索引字段处理
  56. for _, field := range EsFields {
  57. if tmp[field] == nil {
  58. continue
  59. }
  60. if field == "buyerclass" {
  61. if reflect.TypeOf(tmp["buyerclass"]).String() == "[]interface {}" {
  62. esMap["buyerclass"] = util.ObjArrToStringArr(tmp["buyerclass"].([]interface{}))[0]
  63. } else {
  64. esMap["buyerclass"] = tmp["buyerclass"]
  65. }
  66. } else {
  67. esMap[field] = tmp[field]
  68. }
  69. }
  70. // 处理result
  71. if mp, ok := tmp["results"].([]interface{}); ok {
  72. var mpArr []map[string]interface{}
  73. for _, v := range mp {
  74. v1 := v.(map[string]interface{})
  75. tmp := make(map[string]interface{})
  76. if v1["purchasing"] != nil {
  77. tmp["purchasing"] = v1["purchasing"]
  78. }
  79. if v1["p_projects"] != nil {
  80. tmp["p_projects"] = v1["p_projects"]
  81. }
  82. mpArr = append(mpArr, tmp)
  83. }
  84. if len(mpArr) > 0 {
  85. esMap["results"] = mpArr
  86. }
  87. }
  88. EsSaveCache <- esMap
  89. }(tmp)
  90. tmp = make(map[string]interface{})
  91. }
  92. wg.Wait()
  93. log.Println("Run Over...Count:", count)
  94. }
  95. // SaveAll 存量数据生es
  96. func SaveAll() {
  97. defer util.Catch()
  98. sess := Mgo.GetMgoConn()
  99. defer Mgo.DestoryMongoConn(sess)
  100. pool := make(chan bool, 10)
  101. wg := &sync.WaitGroup{}
  102. //q := bson.M{"_id": mongodb.StringTOBsonId("6227a7b18f0c45f21aabe120")}
  103. it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter()
  104. count := 0
  105. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  106. if count%20000 == 0 {
  107. log.Println("current:", count, tmp["_id"])
  108. }
  109. pool <- true
  110. wg.Add(1)
  111. go func(tmp map[string]interface{}) {
  112. defer func() {
  113. <-pool
  114. wg.Done()
  115. }()
  116. esMap := map[string]interface{}{}
  117. //生索引字段处理
  118. for _, field := range EsFields {
  119. if tmp[field] == nil {
  120. continue
  121. }
  122. if field == "buyerclass" {
  123. if reflect.TypeOf(tmp["buyerclass"]).String() == "[]interface {}" {
  124. esMap["buyerclass"] = util.ObjArrToStringArr(tmp["buyerclass"].([]interface{}))[0]
  125. } else {
  126. esMap["buyerclass"] = tmp["buyerclass"]
  127. }
  128. } else {
  129. esMap[field] = tmp[field]
  130. }
  131. }
  132. // 处理result
  133. if mp, ok := tmp["results"].([]interface{}); ok {
  134. var mpArr []map[string]interface{}
  135. for _, v := range mp {
  136. v1 := v.(map[string]interface{})
  137. tmp := make(map[string]interface{})
  138. if v1["purchasing"] != nil {
  139. tmp["purchasing"] = v1["purchasing"]
  140. }
  141. if v1["p_projects"] != nil {
  142. tmp["p_projects"] = v1["p_projects"]
  143. }
  144. mpArr = append(mpArr, tmp)
  145. }
  146. if len(mpArr) > 0 {
  147. esMap["results"] = mpArr
  148. }
  149. }
  150. EsSaveCache <- esMap
  151. }(tmp)
  152. tmp = make(map[string]interface{})
  153. }
  154. wg.Wait()
  155. log.Println("Run Over...Count:", count)
  156. }
  157. // SaveEs 过滤后数据存库
  158. func SaveEs() {
  159. log.Println("Es Save...")
  160. arru := make([]map[string]interface{}, 100)
  161. indexu := 0
  162. for {
  163. select {
  164. case v := <-EsSaveCache:
  165. arru[indexu] = v
  166. indexu++
  167. if indexu == 100 {
  168. SP <- true
  169. go func(arru []map[string]interface{}) {
  170. defer func() {
  171. <-SP
  172. }()
  173. Es.BulkSave(Index, Itype, &arru, true)
  174. }(arru)
  175. arru = make([]map[string]interface{}, 100)
  176. indexu = 0
  177. }
  178. case <-time.After(1000 * time.Millisecond):
  179. if indexu > 0 {
  180. SP <- true
  181. go func(arru []map[string]interface{}) {
  182. defer func() {
  183. <-SP
  184. }()
  185. Es.BulkSave(Index, Itype, &arru, true)
  186. }(arru[:indexu])
  187. arru = make([]map[string]interface{}, 100)
  188. indexu = 0
  189. }
  190. }
  191. }
  192. }