package main import ( "flag" "fmt" "github.com/robfig/cron" "go.mongodb.org/mongo-driver/bson" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "net/http" "time" ) var ( MgoBid *mongodb.MongodbSim lastId, startId string mail_to = "wangjianghan@topnet.net.cn,zhangjinkun@topnet.net.cn,maxiaoshan@topnet.net.cn,zhengkun@topnet.net.cn" mail_api = "http://172.17.145.179:19281/_send/_mail" skip = 0 MgoBulkSize = 200 ) func init() { MgoBid = &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081", DbName: "qfw", Size: 5, UserName: "SJZY_RWESBid_Other", Password: "SJZY@O17t8herB3B", } MgoBid.InitPool() log.InitLog() } func main() { flag.StringVar(&startId, "gtid", "", "开始id") flag.Parse() if startId != "" { TimeTask() } else { flag.PrintDefaults() } } func TimeTask() { crn := cron.New() //cronstr := "0 */1 * * * ?" // 每2min执行一次 cronstr := "*/20 * * * * ?" // 每20s执行一次 ct := 0 _ = crn.AddFunc(cronstr, func() { ct += 1 util.Debug(fmt.Sprintf("task count: %d", ct)) taskinfo() }) crn.Start() c := make(chan bool, 1) <-c } func taskinfo() { //currentTime := time.Now() //m, _ := time.ParseDuration("5m") // 5分钟之前 //rtime := currentTime.Add(m) //eid := primitive.NewObjectIDFromTimestamp(rtime) //qfid := bson.M{"_id": bson.M{"$lte": eid}} info, _ := MgoBid.Find("bidding", nil, `{"_id": -1}`, `{"_id": 1}`, true, -1, -1) if len(*info) > 0 && lastId != mongodb.BsonIdToSId((*info)[0]["_id"]) { lastId = mongodb.BsonIdToSId((*info)[0]["_id"]) } else { if skip >= 30 { sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", lastId)) skip = 0 } else { skip++ } log.Debug(fmt.Sprintf("skip: %d, lastid: %s", skip, lastId)) return } skip = 0 info1, _ := MgoBid.Find("bidding_processing_ids", `{"dataprocess": 0}`, `{"_id": -1}`, nil, false, -1, 2) if len(*info1) > 1 { startId = util.ObjToString((*info1)[0]["gtid"]) q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}} count := MgoBid.Count("bidding", q) if count > 10000 { startId = util.ObjToString((*info1)[0]["lteid"]) q = bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}} util.Debug(q) count = MgoBid.Count("bidding", q) save := make(map[string]interface{}) save["gtid"] = startId save["lteid"] = lastId save["count"] = count now := time.Now().Unix() save["dataprocess"] = 0 save["createtime"] = now save["updatetime"] = now saveFuc(save, util.Int64All((*info1)[0]["last_autoid"])) startId = lastId } else { update := make(map[string]interface{}) update["lteid"] = lastId startId = util.ObjToString((*info1)[0]["gtid"]) update["count"] = count update["updatetime"] = time.Now().Unix() updateFuc((*info1)[0]["_id"], update, util.Int64All((*info1)[0]["last_autoid"]), util.ObjToString((*info1)[0]["lteid"])) startId = lastId } } else { q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}} count := MgoBid.Count("bidding", q) info2, _ := MgoBid.Find("bidding_processing_ids", nil, `{"_id": -1}`, nil, true, -1, 1) save := make(map[string]interface{}) save["gtid"] = startId save["lteid"] = lastId save["count"] = count now := time.Now().Unix() save["dataprocess"] = 0 save["createtime"] = now save["updatetime"] = now saveFuc(save, util.Int64All((*info2)[0]["last_autoid"])) startId = lastId } } func saveFuc(save map[string]interface{}, autoid int64) { conn := MgoBid.GetMgoConn() defer MgoBid.DestoryMongoConn(conn) gtid := util.ObjToString(save["gtid"]) lteid := util.ObjToString(save["lteid"]) q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}} result := conn.DB(MgoBid.DbName).C("bidding").Find(q).Select(bson.M{"_id": 1}).Iter() var bidUpdate [][]map[string]interface{} for tmp := make(map[string]interface{}); result.Next(tmp); { autoid++ bidUpdate = append(bidUpdate, []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": bson.M{"autoid": autoid}}, }) if len(bidUpdate) >= MgoBulkSize { tmps := bidUpdate MgoBid.UpdateBulk("bidding", tmps...) bidUpdate = [][]map[string]interface{}{} } } if len(bidUpdate) > 0 { tmps := bidUpdate MgoBid.UpdateBulk("bidding", tmps...) bidUpdate = [][]map[string]interface{}{} } save["last_autoid"] = autoid MgoBid.Save("bidding_processing_ids", save) } func updateFuc(id interface{}, update map[string]interface{}, autoid int64, gtid string) { conn := MgoBid.GetMgoConn() defer MgoBid.DestoryMongoConn(conn) lteid := util.ObjToString(update["lteid"]) q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}} result := conn.DB(MgoBid.DbName).C("bidding").Find(q).Select(bson.M{"_id": 1}).Iter() var bidUpdate [][]map[string]interface{} for tmp := make(map[string]interface{}); result.Next(tmp); { autoid++ bidUpdate = append(bidUpdate, []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": bson.M{"autoid": autoid}}, }) if len(bidUpdate) >= MgoBulkSize { tmps := bidUpdate MgoBid.UpdateBulk("bidding", tmps...) bidUpdate = [][]map[string]interface{}{} } } if len(bidUpdate) > 0 { tmps := bidUpdate MgoBid.UpdateBulk("bidding", tmps...) bidUpdate = [][]map[string]interface{}{} } update["last_autoid"] = autoid MgoBid.UpdateById("bidding_processing_ids", id, map[string]interface{}{"$set": update}) } func sendMail(content string) { res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", mail_api, mail_to, "processing_ids-send-fail", content)) if err == nil { defer res.Body.Close() read, _ := io.ReadAll(res.Body) util.Debug("send mail ..." + string(read)) } }