package main import ( "context" "github.com/robfig/cron/v3" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "sync" "time" ) var ( MgoB *mongodb.MongodbSim MgoP *mongodb.MongodbSim columns = make([]map[string]interface{}, 0) //存储配置 栏目 // 更新mongo updatePool = make(chan []map[string]interface{}, 5000) ) func main() { go updateMethod() local, _ := time.LoadLocation("Asia/Shanghai") c := cron.New(cron.WithLocation(local), cron.WithSeconds()) eid, err := c.AddFunc(GF.Cron.Spec, dealData) if err != nil { log.Info("main", zap.Any("AddFunc err", err)) } log.Info("main", zap.Any("eid", eid)) c.Start() defer c.Stop() // select {} } func dealData() { go dealBidding() go dealProject() } //dealBidding 处理标讯数据 func dealBidding() { sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) var q interface{} var startTime = GF.Cron.Start now := time.Now() ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour) coll := sess.M.C.Database(GF.MongoB.DB).Collection(GF.MongoB.Coll) find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"comeintime", 1}}).SetProjection(bson.M{"comeintime": 1, "toptype": 1, "subtype": 1, "buyerclass": 1, "title": 1, "detail": 1, "package": 1, "funds": 1}) if startTime != 0 && GF.Cron.End != 0 { q = map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": GF.Cron.Start, "$lte": GF.Cron.End, }, } } else if startTime != 0 { q = map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": startTime, }, } } else if startTime == 0 && GF.Cron.End == 0 { //默认 取大于 昨天的数据 q = map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(), }, } } cur, err := coll.Find(ctx, q, find) if err != nil { log.Error("dealBidding,coll.Find ", zap.Error(err)) } log.Info("dealBidding", zap.Any("q", q)) //query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(q).Select(map[string]interface{}{ // "contenthtml": 0}).Iter() count := 0 ch := make(chan bool, 15) wg := &sync.WaitGroup{} //for tmp := make(map[string]interface{}); query.Next(tmp); count++ { for tmp := make(map[string]interface{}); cur.Next(ctx); count++ { if cur != nil { cur.Decode(&tmp) } startTime = util.IntAll(tmp["comeintime"]) if count%1000 == 0 { log.Info("dealBidding", zap.Int("current", count), zap.Any("comeintime", tmp["comeintime"])) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() rea := TagBidding(tmp) if len(rea) > 0 { reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果 //update := map[string]interface{}{ // "nav_column": reb, //} //where := map[string]interface{}{ // "_id": tmp["_id"], //} updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": bson.M{ "nav_column": reb, }}, } //MgoB.Update(GF.MongoB.Coll, where, map[string]interface{}{"$set": update}, true, false) } }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("dealBidding", zap.Int("over ", count)) //没有数据时,发送邮件 if count == 0 { SendMail("每日数据监控", "查询数据为空,请处理") } } //dealProject 处理拟建项目数据标签 func dealProject() { sess := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(sess) // 指定对应的时间格式 //layout := "2006-01-02 15:04:05" // 获取当前时间 now := time.Now() var q interface{} var startTime = GF.Cron.Start if startTime != 0 && GF.Cron.End != 0 { q = map[string]interface{}{ "pici": map[string]interface{}{ "$gt": GF.Cron.Start, "$lte": GF.Cron.End, }, } } else if startTime != 0 { q = map[string]interface{}{ "pici": map[string]interface{}{ "$gt": startTime, }, } } else if startTime == 0 && GF.Cron.End == 0 { //默认 取大于 昨天的数据 q = map[string]interface{}{ "pici": map[string]interface{}{ "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(), }, } } log.Info("dealProject", zap.Any("q", q)) query := sess.DB(GF.MongoP.DB).C(GF.MongoP.Coll).Find(q).Select(map[string]interface{}{ "ids": 0, "list": 0}).Iter() count := 0 ch := make(chan bool, 15) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Info("dealProject", zap.Int("current", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() rea := TagProject(tmp) if len(rea) > 0 { reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果 update := map[string]interface{}{ "nav_column": reb, } where := map[string]interface{}{ "_id": tmp["_id"], } MgoP.Update(GF.MongoP.Coll, where, map[string]interface{}{"$set": update}, true, false) } }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("dealProject", zap.Int("over ", count)) //没有数据时,发送邮件 if count == 0 { SendMail("网站导航 数据标签", "查询 拟建项目数据为空,请查收") } } // updateMethod 更新MongoDB func updateMethod() { updateSp := make(chan bool, 8) arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 200 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk(GF.MongoB.Coll, arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk(GF.MongoB.Coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }