main.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. "net/http"
  6. "qfw/util"
  7. "strings"
  8. "github.com/fvbock/endless"
  9. "github.com/robfig/cron"
  10. )
  11. var config map[string]interface{}
  12. //定时任务对象
  13. type T struct {
  14. From string
  15. To string
  16. Thread int
  17. Bulk int
  18. SelectFields string
  19. Query string
  20. Cron string
  21. Name string
  22. Run bool
  23. }
  24. var Ts = []*T{}
  25. func main() {
  26. log.Println("start..")
  27. if len(Ts) > 0 {
  28. go task()
  29. }
  30. endless.ListenAndServe(config["port"].(string), http.DefaultServeMux)
  31. }
  32. func init() {
  33. util.ReadConfig(&config)
  34. task := config["task"].([]interface{})
  35. for _, v := range task {
  36. tObj := &T{}
  37. bs, _ := json.Marshal(v)
  38. json.Unmarshal(bs, &tObj)
  39. Ts = append(Ts, tObj)
  40. }
  41. }
  42. //启动定时任务
  43. func task() {
  44. log.Println("启动定时任务")
  45. c := cron.New()
  46. for _, v := range Ts {
  47. c.AddFunc(v.Cron, v.elt)
  48. }
  49. c.Start()
  50. defer c.Stop()
  51. select {}
  52. }
  53. //对象的执行任务
  54. func (t *T) elt() {
  55. if t.Run {
  56. return
  57. }
  58. t.Run = true
  59. defer func() {
  60. t.Run = false
  61. }()
  62. log.Println("start..", t.Name)
  63. fs := strings.Split(t.From, "|")
  64. ts := strings.Split(t.To, "|")
  65. fmgo := NewMgo(fs[0], fs[1], 2)
  66. tmgo := NewMgo(ts[0], ts[1], 2)
  67. defer fmgo.Destroy()
  68. defer tmgo.Destroy()
  69. mode := 0
  70. update := "_etl"
  71. if len(fs) == 4 {
  72. //删除或更新模式
  73. du := fs[3]
  74. if du == "D" {
  75. //删除模式
  76. mode = 2
  77. } else if string(du[0]) == "U" {
  78. //更新模式
  79. mode = 1
  80. if len(du) > 2 {
  81. update = du[2:]
  82. }
  83. }
  84. }
  85. log.Println(mode, update)
  86. sess := fmgo.GetMgoConn()
  87. defer fmgo.DestoryMongoConn(sess)
  88. q := util.ObjToMap(t.Query)
  89. select1 := util.ObjToMap(t.SelectFields)
  90. log.Println(t.Name, q, select1)
  91. query := sess.DB(fmgo.DbName).C(fs[2]).Find(q).Select(select1).Sort("_id").Iter()
  92. saves := make([]map[string]interface{}, t.Bulk)
  93. index := 0
  94. pool := make(chan bool, t.Thread)
  95. count := 0
  96. for {
  97. tmp := make(map[string]interface{})
  98. if query.Next(&tmp) {
  99. saves[index] = tmp
  100. if index == t.Bulk-1 {
  101. pool <- true
  102. go func(s []map[string]interface{}) {
  103. defer func() {
  104. <-pool
  105. }()
  106. bsave := tmgo.SaveBulk(ts[2], s...)
  107. if !bsave {
  108. log.Println("error:保存出错", t.Name)
  109. } else {
  110. updateOrDelete(fmgo, fs[2], update, mode, len(s), s[0]["_id"], s[len(s)-1]["_id"])
  111. }
  112. }(saves)
  113. saves = make([]map[string]interface{}, t.Bulk)
  114. index = 0
  115. } else {
  116. index++
  117. }
  118. if count%100 == 0 {
  119. log.Println("current", count)
  120. }
  121. } else {
  122. break
  123. }
  124. count++
  125. }
  126. if index > 0 {
  127. bsave := tmgo.SaveBulk(ts[2], saves[:index]...)
  128. if !bsave {
  129. log.Println("error:保存出错", t.Name)
  130. } else {
  131. updateOrDelete(fmgo, fs[2], update, mode, index, saves[0]["_id"], saves[index-1]["_id"])
  132. }
  133. }
  134. for i := 0; i < t.Thread; i++ {
  135. pool <- true
  136. }
  137. log.Println("end task..", t.Name, count)
  138. }
  139. func updateOrDelete(mgo *MongodbSim, coll, update string, mode, count int, startid, endid interface{}) {
  140. if mode == 2 {
  141. c := mgo.Delete(coll, map[string]interface{}{
  142. "_id": map[string]interface{}{
  143. "$gte": startid,
  144. "$lte": endid,
  145. },
  146. })
  147. if int(c) != count {
  148. log.Println("删除数据量不对", count, "删除量:", c)
  149. }
  150. } else if mode == 1 {
  151. c := mgo.Update(coll, map[string]interface{}{
  152. "_id": map[string]interface{}{
  153. "$gte": startid,
  154. "$lte": endid,
  155. },
  156. }, map[string]interface{}{
  157. "$set": map[string]interface{}{
  158. update: 1,
  159. },
  160. })
  161. if int(c) != count {
  162. log.Println("更新数据量不对", count, "更新量:", c)
  163. }
  164. }
  165. }