123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- package main
- import (
- "flag"
- "fmt"
- "github.com/robfig/cron"
- "go.mongodb.org/mongo-driver/bson"
- "io"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "net/http"
- "time"
- )
- var (
- MgoBid *mongodb.MongodbSim
- lastId, startId string
- mail_to = "wangjianghan@topnet.net.cn,zhangjinkun@topnet.net.cn,maxiaoshan@topnet.net.cn,zhengkun@topnet.net.cn"
- mail_api = "http://172.17.145.179:19281/_send/_mail"
- skip = 0
- MgoBulkSize = 200
- )
- func init() {
- MgoBid = &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081",
- DbName: "qfw",
- Size: 5,
- UserName: "SJZY_RWESBid_Other",
- Password: "SJZY@O17t8herB3B",
- }
- MgoBid.InitPool()
- log.InitLog()
- }
- func main() {
- flag.StringVar(&startId, "gtid", "", "开始id")
- flag.Parse()
- if startId != "" {
- TimeTask()
- } else {
- flag.PrintDefaults()
- }
- }
- func TimeTask() {
- crn := cron.New()
- //cronstr := "0 */1 * * * ?" // 每2min执行一次
- cronstr := "*/20 * * * * ?" // 每20s执行一次
- ct := 0
- _ = crn.AddFunc(cronstr, func() {
- ct += 1
- util.Debug(fmt.Sprintf("task count: %d", ct))
- taskinfo()
- })
- crn.Start()
- c := make(chan bool, 1)
- <-c
- }
- func taskinfo() {
- //currentTime := time.Now()
- //m, _ := time.ParseDuration("5m") // 5分钟之前
- //rtime := currentTime.Add(m)
- //eid := primitive.NewObjectIDFromTimestamp(rtime)
- //qfid := bson.M{"_id": bson.M{"$lte": eid}}
- info, _ := MgoBid.Find("bidding", nil, `{"_id": -1}`, `{"_id": 1}`, true, -1, -1)
- if len(*info) > 0 && lastId != mongodb.BsonIdToSId((*info)[0]["_id"]) {
- lastId = mongodb.BsonIdToSId((*info)[0]["_id"])
- } else {
- if skip >= 30 {
- sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", lastId))
- skip = 0
- } else {
- skip++
- }
- log.Debug(fmt.Sprintf("skip: %d, lastid: %s", skip, lastId))
- return
- }
- skip = 0
- info1, _ := MgoBid.Find("bidding_processing_ids", `{"dataprocess": 0}`, `{"_id": -1}`, nil, false, -1, 2)
- if len(*info1) > 1 {
- startId = util.ObjToString((*info1)[0]["gtid"])
- q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
- count := MgoBid.Count("bidding", q)
- if count > 10000 {
- startId = util.ObjToString((*info1)[0]["lteid"])
- q = bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
- util.Debug(q)
- count = MgoBid.Count("bidding", q)
- save := make(map[string]interface{})
- save["gtid"] = startId
- save["lteid"] = lastId
- save["count"] = count
- now := time.Now().Unix()
- save["dataprocess"] = 0
- save["createtime"] = now
- save["updatetime"] = now
- saveFuc(save, util.Int64All((*info1)[0]["last_autoid"]))
- startId = lastId
- } else {
- update := make(map[string]interface{})
- update["lteid"] = lastId
- startId = util.ObjToString((*info1)[0]["gtid"])
- update["count"] = count
- update["updatetime"] = time.Now().Unix()
- updateFuc((*info1)[0]["_id"], update, util.Int64All((*info1)[0]["last_autoid"]), util.ObjToString((*info1)[0]["lteid"]))
- startId = lastId
- }
- } else {
- q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
- count := MgoBid.Count("bidding", q)
- info2, _ := MgoBid.Find("bidding_processing_ids", nil, `{"_id": -1}`, nil, true, -1, 1)
- save := make(map[string]interface{})
- save["gtid"] = startId
- save["lteid"] = lastId
- save["count"] = count
- now := time.Now().Unix()
- save["dataprocess"] = 0
- save["createtime"] = now
- save["updatetime"] = now
- saveFuc(save, util.Int64All((*info2)[0]["last_autoid"]))
- startId = lastId
- }
- }
- func saveFuc(save map[string]interface{}, autoid int64) {
- conn := MgoBid.GetMgoConn()
- defer MgoBid.DestoryMongoConn(conn)
- gtid := util.ObjToString(save["gtid"])
- lteid := util.ObjToString(save["lteid"])
- q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}}
- result := conn.DB(MgoBid.DbName).C("bidding").Find(q).Select(bson.M{"_id": 1}).Iter()
- var bidUpdate [][]map[string]interface{}
- for tmp := make(map[string]interface{}); result.Next(tmp); {
- autoid++
- bidUpdate = append(bidUpdate, []map[string]interface{}{{
- "_id": tmp["_id"],
- },
- {"$set": bson.M{"autoid": autoid}},
- })
- if len(bidUpdate) >= MgoBulkSize {
- tmps := bidUpdate
- MgoBid.UpdateBulk("bidding", tmps...)
- bidUpdate = [][]map[string]interface{}{}
- }
- }
- if len(bidUpdate) > 0 {
- tmps := bidUpdate
- MgoBid.UpdateBulk("bidding", tmps...)
- bidUpdate = [][]map[string]interface{}{}
- }
- save["last_autoid"] = autoid
- MgoBid.Save("bidding_processing_ids", save)
- }
- func updateFuc(id interface{}, update map[string]interface{}, autoid int64, gtid string) {
- conn := MgoBid.GetMgoConn()
- defer MgoBid.DestoryMongoConn(conn)
- lteid := util.ObjToString(update["lteid"])
- q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}}
- result := conn.DB(MgoBid.DbName).C("bidding").Find(q).Select(bson.M{"_id": 1}).Iter()
- var bidUpdate [][]map[string]interface{}
- for tmp := make(map[string]interface{}); result.Next(tmp); {
- autoid++
- bidUpdate = append(bidUpdate, []map[string]interface{}{{
- "_id": tmp["_id"],
- },
- {"$set": bson.M{"autoid": autoid}},
- })
- if len(bidUpdate) >= MgoBulkSize {
- tmps := bidUpdate
- MgoBid.UpdateBulk("bidding", tmps...)
- bidUpdate = [][]map[string]interface{}{}
- }
- }
- if len(bidUpdate) > 0 {
- tmps := bidUpdate
- MgoBid.UpdateBulk("bidding", tmps...)
- bidUpdate = [][]map[string]interface{}{}
- }
- update["last_autoid"] = autoid
- MgoBid.UpdateById("bidding_processing_ids", id, map[string]interface{}{"$set": update})
- }
- func sendMail(content string) {
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", mail_api, mail_to, "processing_ids-send-fail", content))
- if err == nil {
- defer res.Body.Close()
- read, _ := io.ReadAll(res.Body)
- util.Debug("send mail ..." + string(read))
- }
- }
|