package main import ( "log" "strings" "time" "mongodb" common "qfw/util" "github.com/robfig/cron" ) var ( Mgo *mongodb.MongodbSim BzMgo *mongodb.MongodbSim cfg = new(Config) ) func init() { common.ReadConfig(&cfg) Mgo = mongodb.NewMgo(cfg.Db.Address, cfg.Db.DbName, cfg.Db.DbSize) BzMgo = mongodb.NewMgo(cfg.Bz.Address, cfg.Bz.DbName, cfg.Bz.DbSize) } func runJob() { log.Println("中国联通数据迁移任务开始------") log.Println("Cfg: ", cfg) query, count, session := map[string]interface{}{"appid": "jyGQ1XQQsEAwNeSENOFR9D"}, 0, Mgo.GetMgoConn() defer func() { Mgo.DestoryMongoConn(session) }() iter := session.DB(cfg.Db.DbName).C(cfg.Db.ColName).Find(&query).Sort("_id").Iter() thisData := map[string]interface{}{} for { if !iter.Next(&thisData) { break } count++ log.Println("第", count, "条") id := mongodb.BsonIdToSId(thisData["_id"]) area := common.ObjToString(thisData["area"]) buyer := common.ObjToString(thisData["buyer"]) // s_winner := common.ObjToString(thisData["s_winner"]) toptype := common.ObjToString(thisData["toptype"]) thisData["createtime"] = time.Now().Unix() if toptype == "招标" || toptype == "预告" { if area != "全国" && area != "" && buyer != "" && !strings.Contains(buyer, "本级") && !strings.Contains(buyer, "本部") && !strings.Contains(buyer, "机关") { saveId := Mgo.Save("usermail", thisData) if saveId != "" { log.Println("数据保存usermail成功", id, saveId) delC := Mgo.Delete(cfg.Db.ColName, map[string]interface{}{"_id": mongodb.StringTOBsonId(id)}) if delC > 0 { log.Println("数据从临时表删除成功", delC, id, saveId) } else { log.Println("数据从临时表删除失败!!!!", id, saveId) } } else { log.Println("数据保存usermail失败!!!!", id) } } else { baseInfoMap := map[string]interface{}{} baseInfoMap["id"] = id baseInfoMap["v_baseinfo"] = thisData baseInfoMap["b_isprchasing"] = true baseInfoMap["b_istagging"] = true baseInfoMap["i_createtime"] = time.Now().Unix() baseInfoMap["b_isgivegroup"] = false //是否分配给用户组 baseInfoMap["b_istag"] = false //是否已标注 baseInfoMap["b_isgiveuser"] = false //是否分配给用户 baseInfoMap["b_check"] = false // 质检标记 baseInfoMap["b_isEff"] = false // 标的物有效性 saveId := BzMgo.Save(cfg.Bz.ColName, baseInfoMap) if saveId != "" { log.Println("数据保存数据标注表成功", id, saveId) //清洗同时也先存usermail推送 Mgo.Save("usermail", thisData) // delC := Mgo.Delete(cfg.Db.ColName, map[string]interface{}{"_id": mongodb.StringTOBsonId(id)}) if delC > 0 { log.Println("数据从临时表删除成功", delC, id, saveId) } else { log.Println("数据从临时表删除失败!!!!", id, saveId) } } else { log.Println("数据保存数据标注表失败!!!!", id) } } } else { saveId := Mgo.Save("usermail", thisData) if saveId != "" { log.Println("数据保存usermail成功", id, saveId) delC := Mgo.Delete(cfg.Db.ColName, map[string]interface{}{"_id": mongodb.StringTOBsonId(id)}) if delC > 0 { log.Println("数据从临时表删除成功", delC, id, saveId) } else { log.Println("数据从临时表删除失败!!!!", id, saveId) } } else { log.Println("数据保存usermail失败!!!!", id) } } thisData = map[string]interface{}{} } log.Println("中国联通数据迁移任务结束------") } func main() { runJob() c := cron.New() c.AddFunc(cfg.CornExp, func() { runJob() }) c.Start() select {} }