123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 |
- package main
- import (
- "context"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/net/gclient"
- "github.com/gogf/gf/v2/os/gtime"
- "github.com/robfig/cron/v3"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/mongo/options"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "strings"
- "sync"
- "time"
- )
- var (
- MgoB *mongodb.MongodbSim
- MgoP *mongodb.MongodbSim
- columns = make([]map[string]interface{}, 0) //存储配置 栏目
- // 更新mongo
- updatePool = make(chan []map[string]interface{}, 5000)
- )
- func main() {
- go updateMethod()
- local, _ := time.LoadLocation("Asia/Shanghai")
- c := cron.New(cron.WithLocation(local), cron.WithSeconds())
- eid, err := c.AddFunc(GF.Cron.Spec, dealData)
- if err != nil {
- log.Info("main", zap.Any("AddFunc err", err))
- }
- log.Info("main", zap.Any("eid", eid))
- c.Start()
- defer c.Stop()
- //
- select {}
- }
- func dealData() {
- go dealBidding()
- go dealProject()
- }
- // dealBidding 处理标讯数据
- func dealBidding() {
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- var q interface{}
- var startTime = GF.Cron.Start
- now := time.Now()
- ctx, _ := context.WithTimeout(context.Background(), 99999*time.Hour)
- coll := sess.M.C.Database(GF.MongoB.DB).Collection(GF.MongoB.Coll)
- find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"comeintime", 1}}).SetProjection(bson.M{"comeintime": 1, "toptype": 1, "subtype": 1, "buyerclass": 1, "title": 1, "detail": 1, "package": 1, "funds": 1, "spidercode": 1})
- if startTime != 0 && GF.Cron.End != 0 {
- q = map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gt": GF.Cron.Start,
- "$lte": GF.Cron.End,
- },
- }
- } else if startTime != 0 {
- q = map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gt": startTime,
- },
- }
- } else if startTime == 0 && GF.Cron.End == 0 {
- //默认 取大于 昨天的数据
- q = map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(),
- },
- }
- }
- cur, err := coll.Find(ctx, q, find)
- if err != nil {
- log.Error("dealBidding,coll.Find ", zap.Error(err))
- }
- log.Info("dealBidding", zap.Any("q", q))
- //query := sess.DB(GF.MongoB.DB).C(GF.MongoB.Coll).Find(q).Select(map[string]interface{}{
- // "contenthtml": 0}).Iter()
- count := 0
- ch := make(chan bool, 15)
- wg := &sync.WaitGroup{}
- //for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- for tmp := make(map[string]interface{}); cur.Next(ctx); count++ {
- if cur != nil {
- cur.Decode(&tmp)
- }
- startTime = util.IntAll(tmp["comeintime"])
- if count%1000 == 0 {
- log.Info("dealBidding", zap.Int("current", count), zap.Any("comeintime", tmp["comeintime"]))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- rea := TagBidding(tmp)
- if len(rea) > 0 {
- reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
- //update := map[string]interface{}{
- // "nav_column": reb,
- //}
- //where := map[string]interface{}{
- // "_id": tmp["_id"],
- //}
- updatePool <- []map[string]interface{}{
- {"_id": tmp["_id"]},
- {"$set": bson.M{
- "nav_column": reb,
- }},
- }
- //MgoB.Update(GF.MongoB.Coll, where, map[string]interface{}{"$set": update}, true, false)
- }
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- log.Info("dealBidding", zap.Int("over ", count))
- //没有数据时,发送邮件
- if count == 0 {
- SendMail("每日数据监控", "查询数据为空,请处理")
- }
- //处理热门标讯数据/热门项目
- getHot()
- log.Info("dealBidding", zap.Int("over ", count))
- }
- // getHot 获取热门数据
- func getHot() {
- var existsMap = make(map[string]bool) //bidding_hots 已经存在的ID
- //获取已有热门数据
- hots, _ := MgoB.Find("bidding_hots", nil, nil, map[string]interface{}{"bidding_id": 1}, false, -1, -1)
- if len(*hots) > 0 {
- for _, v := range *hots {
- existsMap[util.ObjToString(v["bidding_id"])] = true
- }
- }
- var hotMap = make(map[string]int)
- //1.获取昨日数据
- file := gtime.Now().AddDate(0, 0, -1).Format("Y-m-d")
- res := gclient.New().GetContent(context.Background(), "http://172.17.145.164:18880/jyartvisit/"+file+".res")
- arrs := strings.Split(res, "\n")
- for _, va := range arrs {
- vs := strings.Split(va, " ")
- if len(vs) == 2 {
- insert := map[string]interface{}{
- "bidding_id": vs[0],
- "num": g.NewVar(vs[1]).Int(),
- "date": file,
- }
- err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hot_data", insert)
- if err != nil {
- log.Error("getHot", zap.Error(err))
- }
- }
- }
- //2. 汇总15天内,符合条件热门数据
- where := map[string]interface{}{
- "date": map[string]interface{}{
- "$gte": gtime.Now().AddDate(0, 0, -GF.Cron.HotDay).Format("Y-m-d"),
- },
- }
- hotData, _ := MgoB.Find("bidding_hot_data", where, nil, nil, false, -1, -1)
- for _, data := range *hotData {
- biddingID := util.ObjToString(data["bidding_id"])
- // 将字符串转换为 ObjectId
- objectID := mongodb.StringTOBsonId(biddingID)
- // 从 ObjectId 中提取时间戳
- timestamp := objectID.Timestamp()
- // 判断时间是否在最近一年内
- oneYearAgo := time.Now().AddDate(-1, 0, 0)
- isWithinOneYear := timestamp.After(oneYearAgo)
- if !isWithinOneYear {
- continue
- }
- num := util.IntAll(data["num"])
- if da, ok := hotMap[biddingID]; ok {
- hotMap[biddingID] = num + da
- } else {
- hotMap[biddingID] = num
- }
- }
- newCount := 0
- for k, v := range hotMap {
- //之前存在过的数据,不再入 bidding_hots
- if existsMap[k] {
- continue
- }
- if v > GF.Cron.HotNum {
- bidding, _ := MgoB.FindById("bidding", k, nil)
- biddingData := *bidding
- if biddingData == nil {
- continue
- }
- newCount++
- insert := map[string]interface{}{
- "bidding_id": k,
- "num": v,
- "createtime": time.Now().Unix(),
- "date": time.Now().Format("2006-01-02"),
- }
- err := MgoB.InsertOrUpdate(GF.MongoB.DB, "bidding_hots", insert)
- if err != nil {
- log.Error("getHot", zap.Error(err))
- }
- biddingData["hot_data"] = 1
- rea := TagBidding(biddingData)
- reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
- updatePool <- []map[string]interface{}{
- {"_id": mongodb.StringTOBsonId(k)},
- {"$set": bson.M{
- "nav_column": reb,
- }},
- }
- }
- }
- log.Info("getHot", zap.Int("over", newCount))
- }
- // dealProject 处理拟建项目数据标签
- func dealProject() {
- sess := MgoP.GetMgoConn()
- defer MgoP.DestoryMongoConn(sess)
- // 指定对应的时间格式
- //layout := "2006-01-02 15:04:05"
- // 获取当前时间
- now := time.Now()
- var q interface{}
- var startTime = GF.Cron.Start
- if startTime != 0 && GF.Cron.End != 0 {
- q = map[string]interface{}{
- "pici": map[string]interface{}{
- "$gt": GF.Cron.Start,
- "$lte": GF.Cron.End,
- },
- }
- } else if startTime != 0 {
- q = map[string]interface{}{
- "pici": map[string]interface{}{
- "$gt": startTime,
- },
- }
- } else if startTime == 0 && GF.Cron.End == 0 {
- //默认 取大于 昨天的数据
- q = map[string]interface{}{
- "pici": map[string]interface{}{
- "$gt": time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()).Unix(),
- },
- }
- }
- log.Info("dealProject", zap.Any("q", q))
- query := sess.DB(GF.MongoP.DB).C(GF.MongoP.Coll).Find(q).Select(map[string]interface{}{
- "ids": 0, "list": 0}).Iter()
- count := 0
- ch := make(chan bool, 15)
- wg := &sync.WaitGroup{}
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%1000 == 0 {
- log.Info("dealProject", zap.Int("current", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- rea := TagProject(tmp)
- if len(rea) > 0 {
- reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
- update := map[string]interface{}{
- "nav_column": reb,
- }
- where := map[string]interface{}{
- "_id": tmp["_id"],
- }
- MgoP.Update(GF.MongoP.Coll, where, map[string]interface{}{"$set": update}, true, false)
- }
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- log.Info("dealProject", zap.Int("over ", count))
- //没有数据时,发送邮件
- if count == 0 {
- SendMail("网站导航 数据标签", "查询 拟建项目数据为空,请查收")
- }
- }
- // updateMethod 更新MongoDB
- func updateMethod() {
- updateSp := make(chan bool, 8)
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MgoB.UpdateBulk(GF.MongoB.Coll, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|