123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package main
- import (
- "encoding/json"
- "log"
- "net/http"
- "qfw/util"
- "strings"
- "github.com/fvbock/endless"
- "github.com/robfig/cron"
- )
- var config map[string]interface{}
- //定时任务对象
- type T struct {
- From string
- To string
- Thread int
- Bulk int
- SelectFields string
- Query string
- Cron string
- Name string
- Run bool
- }
- var Ts = []*T{}
- func main() {
- log.Println("start..")
- if len(Ts) > 0 {
- go task()
- }
- endless.ListenAndServe(config["port"].(string), http.DefaultServeMux)
- }
- func init() {
- util.ReadConfig(&config)
- task := config["task"].([]interface{})
- for _, v := range task {
- tObj := &T{}
- bs, _ := json.Marshal(v)
- json.Unmarshal(bs, &tObj)
- Ts = append(Ts, tObj)
- }
- }
- //启动定时任务
- func task() {
- log.Println("启动定时任务")
- c := cron.New()
- for _, v := range Ts {
- c.AddFunc(v.Cron, v.elt)
- }
- c.Start()
- defer c.Stop()
- select {}
- }
- //对象的执行任务
- func (t *T) elt() {
- if t.Run {
- return
- }
- t.Run = true
- defer func() {
- t.Run = false
- }()
- log.Println("start..", t.Name)
- fs := strings.Split(t.From, "|")
- ts := strings.Split(t.To, "|")
- fmgo := NewMgo(fs[0], fs[1], 2)
- tmgo := NewMgo(ts[0], ts[1], 2)
- defer fmgo.Destroy()
- defer tmgo.Destroy()
- mode := 0
- update := "_etl"
- if len(fs) == 4 {
- //删除或更新模式
- du := fs[3]
- if du == "D" {
- //删除模式
- mode = 2
- } else if string(du[0]) == "U" {
- //更新模式
- mode = 1
- if len(du) > 2 {
- update = du[2:]
- }
- }
- }
- log.Println(mode, update)
- sess := fmgo.GetMgoConn()
- defer fmgo.DestoryMongoConn(sess)
- q := util.ObjToMap(t.Query)
- select1 := util.ObjToMap(t.SelectFields)
- log.Println(t.Name, q, select1)
- query := sess.DB(fmgo.DbName).C(fs[2]).Find(q).Select(select1).Sort("_id").Iter()
- saves := make([]map[string]interface{}, t.Bulk)
- index := 0
- pool := make(chan bool, t.Thread)
- count := 0
- for {
- tmp := make(map[string]interface{})
- if query.Next(&tmp) {
- saves[index] = tmp
- if index == t.Bulk-1 {
- pool <- true
- go func(s []map[string]interface{}) {
- defer func() {
- <-pool
- }()
- bsave := tmgo.SaveBulk(ts[2], s...)
- if !bsave {
- log.Println("error:保存出错", t.Name)
- } else {
- updateOrDelete(fmgo, fs[2], update, mode, len(s), s[0]["_id"], s[len(s)-1]["_id"])
- }
- }(saves)
- saves = make([]map[string]interface{}, t.Bulk)
- index = 0
- } else {
- index++
- }
- if count%100 == 0 {
- log.Println("current", count)
- }
- } else {
- break
- }
- count++
- }
- if index > 0 {
- bsave := tmgo.SaveBulk(ts[2], saves[:index]...)
- if !bsave {
- log.Println("error:保存出错", t.Name)
- } else {
- updateOrDelete(fmgo, fs[2], update, mode, index, saves[0]["_id"], saves[index-1]["_id"])
- }
- }
- for i := 0; i < t.Thread; i++ {
- pool <- true
- }
- log.Println("end task..", t.Name, count)
- }
- func updateOrDelete(mgo *MongodbSim, coll, update string, mode, count int, startid, endid interface{}) {
- if mode == 2 {
- c := mgo.Delete(coll, map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": startid,
- "$lte": endid,
- },
- })
- if int(c) != count {
- log.Println("删除数据量不对", count, "删除量:", c)
- }
- } else if mode == 1 {
- c := mgo.Update(coll, map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": startid,
- "$lte": endid,
- },
- }, map[string]interface{}{
- "$set": map[string]interface{}{
- update: 1,
- },
- })
- if int(c) != count {
- log.Println("更新数据量不对", count, "更新量:", c)
- }
- }
- }
|