package main import ( "flag" "fmt" "github.com/robfig/cron" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" "io/ioutil" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "net/http" "time" ) var ( MgoBid *mongodb.MongodbSim lastId, startId string mail_to = "zhangjinkun@topnet.net.cn,wangjianghan@topnet.net.cn,maxiaoshan@topnet.net.cn,zhengkun@topnet.net.cn" mail_api = "http://172.17.145.179:19281/_send/_mail" ) func init() { MgoBid = &mongodb.MongodbSim{ MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083", DbName: "qfw", Size: 5, UserName: "SJZY_RWESBid_Other", Password: "SJZY@O17t8herB3B", } MgoBid.InitPool() } func main() { flag.StringVar(&startId, "gtid", "", "开始id") flag.Parse() if startId != "" { TimeTask() } else { flag.PrintDefaults() } } func TimeTask() { crn := cron.New() cronstr := "0 */5 * * * ?" // 每5min执行一次 ct := 0 _ = crn.AddFunc(cronstr, func() { ct += 1 util.Debug(fmt.Sprintf("task count: %d", ct)) taskinfo() }) crn.Start() c := make(chan bool, 1) <-c } func taskinfo() { currentTime := time.Now() m, _ := time.ParseDuration("-5m") // 5分钟之前 rtime := currentTime.Add(m) eid := primitive.NewObjectIDFromTimestamp(rtime) qfid := bson.M{"_id": bson.M{"$lte": eid}} info, _ := MgoBid.Find("bidding", qfid, `{"_id": -1}`, `{"_id": 1}`, true, -1, -1) if info != nil { lastId = mongodb.BsonIdToSId((*info)[0]["_id"]) } else { sendMail("bidding表id查询失败") return } info1, _ := MgoBid.Find("bidding_processing_ids", `{"dataprocess": 0}`, `{"_id": -1}`, nil, false, -1, 2) if len(*info1) > 1 { startId = util.ObjToString((*info1)[0]["gtid"]) q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}} count := MgoBid.Count("bidding", q) ids := fmt.Sprintf("%s-%s", startId, lastId) if count <= 0 { sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", ids)) return } if count > 10000 { startId = util.ObjToString((*info1)[0]["lteid"]) q = bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}} util.Debug(q) count = MgoBid.Count("bidding", q) save := make(map[string]interface{}) save["gtid"] = startId save["lteid"] = lastId save["count"] = count now := time.Now().Unix() save["dataprocess"] = 0 save["createtime"] = now save["updatetime"] = now MgoBid.Save("bidding_processing_ids", save) startId = lastId } else { update := make(map[string]interface{}) update["lteid"] = lastId startId = util.ObjToString((*info1)[0]["gtid"]) update["count"] = count update["updatetime"] = time.Now().Unix() MgoBid.UpdateById("bidding_processing_ids", (*info1)[0]["_id"], map[string]interface{}{"$set": update}) startId = lastId } } else { q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}} count := MgoBid.Count("bidding", q) ids := fmt.Sprintf("%s-%s", startId, lastId) if count <= 0 { sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", ids)) return } save := make(map[string]interface{}) save["gtid"] = startId save["lteid"] = lastId save["count"] = count now := time.Now().Unix() save["dataprocess"] = 0 save["createtime"] = now save["updatetime"] = now MgoBid.Save("bidding_processing_ids", save) startId = lastId } } func sendMail(content string) { res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", mail_api, mail_to, "processing_ids-send-fail", content)) if err == nil { defer res.Body.Close() read, err := ioutil.ReadAll(res.Body) log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err)) } }