package main import ( "fmt" "github.com/robfig/cron/v3" "go.uber.org/zap" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "sync" "time" ) var ( MgoB *mongodb.MongodbSim MgoP *mongodb.MongodbSim ) func main() { local, _ := time.LoadLocation("Asia/Shanghai") c := cron.New(cron.WithLocation(local), cron.WithSeconds()) eid, err := c.AddFunc(GF.Cron.Spec, test) if err != nil { log.Info("main", zap.Any("AddFunc err", err)) } log.Info("main", zap.Any("eid", eid)) c.Start() defer c.Stop() select {} } func test() { fmt.Println("aaa") } //dealBidding 处理标讯数据 func dealBidding() { sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) // 指定对应的时间格式 //layout := "2006-01-02 15:04:05" // 获取当前时间 now := time.Now() //targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Start, 04, 20, 0, 0, now.Location()) targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Start, 0, 0, 0, 0, now.Location()) todayTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.End, 0, 0, 0, 0, now.Location()) q := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": targetTime.Unix(), "$lte": todayTime.Unix(), }, } log.Info("dealBidding", zap.Any("q", q)) query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(q).Select(map[string]interface{}{ "contenthtml": 0}).Iter() count := 0 ch := make(chan bool, 10) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Info("dealBidding", zap.Int("current", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() //saveBidding(tmp) }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("dealBidding", zap.Int("over ", count)) //没有数据时,发送邮件 if count == 0 { SendMail("每日数据监控", "查询数据为空,请处理") } }