package main import ( "encoding/json" "log" "net/http" "qfw/util" "strings" "github.com/fvbock/endless" "github.com/robfig/cron" ) var config map[string]interface{} //定时任务对象 type T struct { From string To string Thread int Bulk int SelectFields string Query string Cron string Name string Run bool } var Ts = []*T{} func main() { log.Println("start..") if len(Ts) > 0 { go task() } endless.ListenAndServe(config["port"].(string), http.DefaultServeMux) } func init() { util.ReadConfig(&config) task := config["task"].([]interface{}) for _, v := range task { tObj := &T{} bs, _ := json.Marshal(v) json.Unmarshal(bs, &tObj) Ts = append(Ts, tObj) } } //启动定时任务 func task() { log.Println("启动定时任务") c := cron.New() for _, v := range Ts { c.AddFunc(v.Cron, v.elt) } c.Start() defer c.Stop() select {} } //对象的执行任务 func (t *T) elt() { if t.Run { return } t.Run = true defer func() { t.Run = false }() log.Println("start..", t.Name) fs := strings.Split(t.From, "|") ts := strings.Split(t.To, "|") fmgo := NewMgo(fs[0], fs[1], 2) tmgo := NewMgo(ts[0], ts[1], 2) defer fmgo.Destroy() defer tmgo.Destroy() mode := 0 update := "_etl" if len(fs) == 4 { //删除或更新模式 du := fs[3] if du == "D" { //删除模式 mode = 2 } else if string(du[0]) == "U" { //更新模式 mode = 1 if len(du) > 2 { update = du[2:] } } } log.Println(mode, update) sess := fmgo.GetMgoConn() defer fmgo.DestoryMongoConn(sess) q := util.ObjToMap(t.Query) select1 := util.ObjToMap(t.SelectFields) log.Println(t.Name, q, select1) query := sess.DB(fmgo.DbName).C(fs[2]).Find(q).Select(select1).Sort("_id").Iter() saves := make([]map[string]interface{}, t.Bulk) index := 0 pool := make(chan bool, t.Thread) count := 0 for { tmp := make(map[string]interface{}) if query.Next(&tmp) { saves[index] = tmp if index == t.Bulk-1 { pool <- true go func(s []map[string]interface{}) { defer func() { <-pool }() bsave := tmgo.SaveBulk(ts[2], s...) if !bsave { log.Println("error:保存出错", t.Name) } else { updateOrDelete(fmgo, fs[2], update, mode, len(s), s[0]["_id"], s[len(s)-1]["_id"]) } }(saves) saves = make([]map[string]interface{}, t.Bulk) index = 0 } else { index++ } if count%100 == 0 { log.Println("current", count) } } else { break } count++ } if index > 0 { bsave := tmgo.SaveBulk(ts[2], saves[:index]...) if !bsave { log.Println("error:保存出错", t.Name) } else { updateOrDelete(fmgo, fs[2], update, mode, index, saves[0]["_id"], saves[index-1]["_id"]) } } for i := 0; i < t.Thread; i++ { pool <- true } log.Println("end task..", t.Name, count) } func updateOrDelete(mgo *MongodbSim, coll, update string, mode, count int, startid, endid interface{}) { if mode == 2 { c := mgo.Delete(coll, map[string]interface{}{ "_id": map[string]interface{}{ "$gte": startid, "$lte": endid, }, }) if int(c) != count { log.Println("删除数据量不对", count, "删除量:", c) } } else if mode == 1 { c := mgo.Update(coll, map[string]interface{}{ "_id": map[string]interface{}{ "$gte": startid, "$lte": endid, }, }, map[string]interface{}{ "$set": map[string]interface{}{ update: 1, }, }) if int(c) != count { log.Println("更新数据量不对", count, "更新量:", c) } } }