package main import ( "context" "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/gclient" "github.com/robfig/cron/v3" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "strings" "sync" "time" ) var ( MgoB *mongodb.MongodbSim MgoP *mongodb.MongodbSim columns = make([]map[string]interface{}, 0) //存储配置 栏目 // 更新mongo updatePool = make(chan []map[string]interface{}, 5000) ) func main() { go updateMethod() local, _ := time.LoadLocation("Asia/Shanghai") c := cron.New(cron.WithLocation(local), cron.WithSeconds()) eid, err := c.AddFunc(GF.Cron.Spec, dealData) if err != nil { log.Info("main", zap.Any("AddFunc err", err)) } log.Info("main", zap.Any("eid", eid)) c.Start() defer c.Stop() // select {} } func dealData() { go dealBidding() go dealProject() } // dealBidding 处理标讯数据 func dealBidding() { sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) var q interface{} var startTime = GF.Cron.Start now := time.Now() ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour) coll := sess.M.C.Database(GF.MongoB.DB).Collection(GF.MongoB.Coll) find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"comeintime", 1}}).SetProjection(bson.M{"comeintime": 1, "toptype": 1, "subtype": 1, "buyerclass": 1, "title": 1, "detail": 1, "package": 1, "funds": 1, "spidercode": 1}) if startTime != 0 && GF.Cron.End != 0 { q = map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": GF.Cron.Start, "$lte": GF.Cron.End, }, } } else if startTime != 0 { q = map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": startTime, }, } } else if startTime == 0 && GF.Cron.End == 0 { //默认 取大于 昨天的数据 q = map[string]interface{}{ "comeintime": map[string]interface{}{ "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(), }, } } cur, err := coll.Find(ctx, q, find) if err != nil { log.Error("dealBidding,coll.Find ", zap.Error(err)) } log.Info("dealBidding", zap.Any("q", q)) //query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(q).Select(map[string]interface{}{ // "contenthtml": 0}).Iter() count := 0 ch := make(chan bool, 15) wg := &sync.WaitGroup{} //for tmp := make(map[string]interface{}); query.Next(tmp); count++ { for tmp := make(map[string]interface{}); cur.Next(ctx); count++ { if cur != nil { cur.Decode(&tmp) } startTime = util.IntAll(tmp["comeintime"]) if count%1000 == 0 { log.Info("dealBidding", zap.Int("current", count), zap.Any("comeintime", tmp["comeintime"])) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() rea := TagBidding(tmp) if len(rea) > 0 { reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果 //update := map[string]interface{}{ // "nav_column": reb, //} //where := map[string]interface{}{ // "_id": tmp["_id"], //} updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": bson.M{ "nav_column": reb, }}, } //MgoB.Update(GF.MongoB.Coll, where, map[string]interface{}{"$set": update}, true, false) } }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("dealBidding", zap.Int("over ", count)) //没有数据时,发送邮件 if count == 0 { SendMail("每日数据监控", "查询数据为空,请处理") } //处理热门标讯数据/热门项目 getHot() log.Info("dealBidding", zap.Int("over ", count)) } // getHot 获取热门数据 func getHot() { var hotMap = make(map[string]bool) //获取已有热门数据 hots, _ := MgoB.Find("bidding_hots", nil, nil, map[string]interface{}{"_id": 1}, false, -1, -1) if len(*hots) > 0 { for _, v := range *hots { hotMap[mongodb.BsonIdToSId(v["_id"])] = true } } getRes := make([]string, 0) end := time.Date(2023, 12, 5, 0, 0, 0, 0, time.Local) now := time.Now() var recentDays []string for i := 0; i < 15; i++ { day := now.AddDate(0, 0, -i) // 如果日期在 2023-12-05 及之后,则添加到 recentDays if !day.Before(end) { recentDays = append(recentDays, day.Format("2006-01-02")) } } for _, v := range recentDays { res := gclient.New().GetContent(context.Background(), "http://172.17.145.164:18880/jyartvisit/"+v+".res") arrs := strings.Split(res, "\n") getRes = append(getRes, arrs...) } for _, v := range getRes { vs := strings.Split(v, " ") if len(vs) == 2 { insert := map[string]interface{}{ "_id": mongodb.StringTOBsonId(vs[0]), "num": g.NewVar(vs[1]).Int(), "createtime": time.Now().Unix(), } err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hot_data", insert) if err != nil { log.Error("getHot", zap.Error(err)) } if !hotMap[vs[0]] { // 将字符串转换为 ObjectId objectID, err := primitive.ObjectIDFromHex(vs[0]) if err != nil { fmt.Println("Error parsing ObjectId:", err) continue } // 从 ObjectId 中提取时间戳 timestamp := objectID.Timestamp() //// 计算时间差 //timeDifference := time.Now().Sub(timestamp) // 判断时间是否在最近一年内 oneYearAgo := time.Now().AddDate(-1, 0, 0) isWithinOneYear := timestamp.After(oneYearAgo) //最近一年数据并且 数量大于100 if isWithinOneYear && g.NewVar(vs[1]).Int() > GF.Cron.HotNum { err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hots", insert) if err != nil { log.Error("getHot", zap.Error(err)) } bidding, _ := MgoB.FindById("bidding", vs[0], nil) biddingData := *bidding if biddingData == nil { continue } biddingData["hot_data"] = 1 rea := TagBidding(biddingData) reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果 log.Info("getHot", zap.Any("reb", reb), zap.String("id", vs[0])) updatePool <- []map[string]interface{}{ {"_id": insert["_id"]}, {"$set": bson.M{ "nav_column": reb, }}, } } } } } } // dealProject 处理拟建项目数据标签 func dealProject() { sess := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(sess) // 指定对应的时间格式 //layout := "2006-01-02 15:04:05" // 获取当前时间 now := time.Now() var q interface{} var startTime = GF.Cron.Start if startTime != 0 && GF.Cron.End != 0 { q = map[string]interface{}{ "pici": map[string]interface{}{ "$gt": GF.Cron.Start, "$lte": GF.Cron.End, }, } } else if startTime != 0 { q = map[string]interface{}{ "pici": map[string]interface{}{ "$gt": startTime, }, } } else if startTime == 0 && GF.Cron.End == 0 { //默认 取大于 昨天的数据 q = map[string]interface{}{ "pici": map[string]interface{}{ "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(), }, } } log.Info("dealProject", zap.Any("q", q)) query := sess.DB(GF.MongoP.DB).C(GF.MongoP.Coll).Find(q).Select(map[string]interface{}{ "ids": 0, "list": 0}).Iter() count := 0 ch := make(chan bool, 15) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Info("dealProject", zap.Int("current", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() rea := TagProject(tmp) if len(rea) > 0 { reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果 update := map[string]interface{}{ "nav_column": reb, } where := map[string]interface{}{ "_id": tmp["_id"], } MgoP.Update(GF.MongoP.Coll, where, map[string]interface{}{"$set": update}, true, false) } }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("dealProject", zap.Int("over ", count)) //没有数据时,发送邮件 if count == 0 { SendMail("网站导航 数据标签", "查询 拟建项目数据为空,请查收") } } // updateMethod 更新MongoDB func updateMethod() { updateSp := make(chan bool, 8) arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 200 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk(GF.MongoB.Coll, arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk(GF.MongoB.Coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }