123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- package main
- import (
- "flag"
- "fmt"
- "github.com/robfig/cron"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "go.uber.org/zap"
- "io/ioutil"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "net/http"
- "time"
- )
- var (
- MgoBid *mongodb.MongodbSim
- lastId, startId string
- mail_to = "zhangjinkun@topnet.net.cn,wangjianghan@topnet.net.cn,maxiaoshan@topnet.net.cn,zhengkun@topnet.net.cn"
- mail_api = "http://172.17.145.179:19281/_send/_mail"
- )
- func init() {
- MgoBid = &mongodb.MongodbSim{
- MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083",
- DbName: "qfw",
- Size: 5,
- UserName: "SJZY_RWESBid_Other",
- Password: "SJZY@O17t8herB3B",
- }
- MgoBid.InitPool()
- }
- func main() {
- flag.StringVar(&startId, "gtid", "", "开始id")
- flag.Parse()
- if startId != "" {
- TimeTask()
- } else {
- flag.PrintDefaults()
- }
- }
- func TimeTask() {
- crn := cron.New()
- cronstr := "0 */5 * * * ?" // 每5min执行一次
- ct := 0
- _ = crn.AddFunc(cronstr, func() {
- ct += 1
- util.Debug(fmt.Sprintf("task count: %d", ct))
- taskinfo()
- })
- crn.Start()
- c := make(chan bool, 1)
- <-c
- }
- func taskinfo() {
- currentTime := time.Now()
- m, _ := time.ParseDuration("-5m") // 5分钟之前
- rtime := currentTime.Add(m)
- eid := primitive.NewObjectIDFromTimestamp(rtime)
- qfid := bson.M{"_id": bson.M{"$lte": eid}}
- info, _ := MgoBid.Find("bidding", qfid, `{"_id": -1}`, `{"_id": 1}`, true, -1, -1)
- if info != nil {
- lastId = mongodb.BsonIdToSId((*info)[0]["_id"])
- } else {
- sendMail("bidding表id查询失败")
- return
- }
- info1, _ := MgoBid.Find("bidding_processing_ids", `{"dataprocess": 0}`, `{"_id": -1}`, nil, false, -1, 2)
- if len(*info1) > 1 {
- startId = util.ObjToString((*info1)[0]["gtid"])
- q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
- count := MgoBid.Count("bidding", q)
- ids := fmt.Sprintf("%s-%s", startId, lastId)
- if count <= 0 {
- sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", ids))
- return
- }
- if count > 10000 {
- startId = util.ObjToString((*info1)[0]["lteid"])
- q = bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
- util.Debug(q)
- count = MgoBid.Count("bidding", q)
- save := make(map[string]interface{})
- save["gtid"] = startId
- save["lteid"] = lastId
- save["count"] = count
- now := time.Now().Unix()
- save["dataprocess"] = 0
- save["createtime"] = now
- save["updatetime"] = now
- MgoBid.Save("bidding_processing_ids", save)
- startId = lastId
- } else {
- update := make(map[string]interface{})
- update["lteid"] = lastId
- startId = util.ObjToString((*info1)[0]["gtid"])
- update["count"] = count
- update["updatetime"] = time.Now().Unix()
- MgoBid.UpdateById("bidding_processing_ids", (*info1)[0]["_id"], map[string]interface{}{"$set": update})
- startId = lastId
- }
- } else {
- q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
- count := MgoBid.Count("bidding", q)
- ids := fmt.Sprintf("%s-%s", startId, lastId)
- if count <= 0 {
- sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", ids))
- return
- }
- save := make(map[string]interface{})
- save["gtid"] = startId
- save["lteid"] = lastId
- save["count"] = count
- now := time.Now().Unix()
- save["dataprocess"] = 0
- save["createtime"] = now
- save["updatetime"] = now
- MgoBid.Save("bidding_processing_ids", save)
- startId = lastId
- }
- }
- func sendMail(content string) {
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", mail_api, mail_to, "processing_ids-send-fail", content))
- if err == nil {
- defer res.Body.Close()
- read, err := ioutil.ReadAll(res.Body)
- log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
- }
- }
|