package util import ( "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo/options" "log" "new-userlocation/db" "regexp" "strings" "sync" "time" ) var ( th_u = 4 ArticleReg, SearchReg, PortraitReg *regexp.Regexp //三类正则 ) type JyMap struct { Lock *sync.Mutex Data map[any]any } func NewJM() *JyMap { jm := &JyMap{} jm.Lock = &sync.Mutex{} jm.Data = map[any]any{} return jm } func (jm *JyMap) Set(k, v any) { jm.Lock.Lock() jm.Data[k] = v jm.Lock.Unlock() } func (jm *JyMap) Get(k any) (v any) { jm.Lock.Lock() v = jm.Data[k] jm.Lock.Unlock() return } func PositionLog() { //从redis中加载时间进行任务 //开启定时任务,开启日志同步 th_u = g.Cfg().MustGet(ctx, "th_u", 4).Int() regex := g.Cfg().MustGet(ctx, "regex", "").String() arr := strings.Split(regex, ",") for _, v := range arr { av := strings.Split(v, ":") if len(av) == 2 { req := regexp.MustCompile(av[0]) switch av[1] { case "search": SearchReg = req case "article": ArticleReg = req case "portrait": PortraitReg = req } } } start := GetLastId() for { now := time.Now().Unix() end := start + 3600 if start >= 1500000000 && now-start >= 7200 { AnalyLogByHour(start, end) log.Println("end") if err := SetLastId(end); err == nil { start = end } } else { log.Println("时间有问题", start, now) } now = time.Now().Unix() if (now - end) > 7300 { time.Sleep(3 * time.Second) } else { time.Sleep(2 * time.Minute) } } } type ( UserLogByHour struct { PositionId int64 `json:"positionId"` Year int32 `json:"year"` Month int32 `json:"month"` Day int32 `json:"day"` Hour int32 `json:"hour"` Create_time time.Time `json:"create_Time"` Article int64 `json:"article"` Search int64 `json:"search"` Portrait int64 `json:"portrait"` Other int64 `json:"other"` S_source string `json:"s_source"` BaseUserId int64 `json:"baseUserId"` } ) func AnalyLogByHour(start, end int64) { log.Println("task run", start, end, gtime.NewFromTimeStamp(start)) logCh := make(chan map[string]any, 3000) //int 转换成id th_Uv := NewThreads(th_u) go func() { ticker := time.NewTicker(30 * time.Minute) defer ticker.Stop() HourMap := NewJM() tn := time.Unix(start, 0) year, month, day, hour, createTime := tn.Year(), int(tn.Month()), tn.Day(), tn.Hour(), tn L: for { select { case v, ok := <-logCh: if !ok { log.Println("chan exit") break L } ip := gconv.String(v["ip"]) ips := strings.Split(ip, ",") if len(ips) > 0 && len(ips[0]) > 6 { if _, pid, bid := GetPositionId(gconv.String(v["userid"])); pid != 0 { //------------------------------ //按小时处理访问请求,使用正则匹配 th_Uv.Open() go func(v map[string]any, userid, baId int64) { defer th_Uv.Close() source := GetSource(gconv.String(v["client"])) key := fmt.Sprintf("%d_%s", userid, source) UV := HourMap.Get(key) if UV == nil { UV = &UserLogByHour{PositionId: userid} HourMap.Set(key, UV) } url := gconv.String(v["url"]) uv1 := UV.(*UserLogByHour) uv1.S_source = source uv1.BaseUserId = baId if ArticleReg.MatchString(url) { uv1.Article++ } else if SearchReg.MatchString(url) { uv1.Search++ } else if PortraitReg.MatchString(url) { uv1.Portrait++ } else { uv1.Other++ } }(v, pid, bid) } } // case <-time.After(3 * time.Minute): case <-ticker.C: log.Println("chan exit1") break L } } //等待处理完成,保存完成,退出数据 th_Uv.Wait() SaveUv(HourMap, year, month, day, hour, createTime) log.Println("task over,exit", gtime.NewFromTimeStamp(start), gtime.NewFromTimeStamp(end)) }() //jy_logs,jyapp_logs,jy_gateway_logs,bigmember_logs,jybxbase_logs //按日分析 //分析不同规则 //从mongo读取] tables := g.Cfg().MustGet(ctx, "log_tables").Strings() log.Println("analy tables:", tables) th_mgo := NewThreads(len(tables)) for _, table := range tables { th_mgo.Open() go func(table string) { defer th_mgo.Close() of := options.Find() of.SetProjection(g.Map{"ip": 1, "url": 1, "userid": 1, "hour": 1, "month": 1, "day": 1, "date": 1, "client": 1}) coll := db.MG.DB("log").C.Database(db.MG.DB("log").DbName).Collection(table) cur, err := coll.Find(ctx, g.Map{"_id": g.Map{"$gte": MongoId(start), "$lt": MongoId(end)}}, of) if err == nil && cur.Err() == nil { n := 0 for ; cur.Next(ctx); n++ { v := g.Map{} e1 := cur.Decode(&v) if e1 == nil { logCh <- v } if n%200 == 0 { log.Println("current", table, n) } } } else { log.Println(err) } }(table) } th_mgo.Wait() close(logCh) } func GetSource(client string) string { if strings.Contains(strings.ToLower(client), "wechat") { return "wx" } else if strings.Contains(strings.ToLower(client), "mobile") { return "app" } return "pc" } func MongoId(t int64) primitive.ObjectID { id, _ := primitive.ObjectIDFromHex(fmt.Sprintf("%x0000000000000000", t)) return id } // 保存用户每小时的访问信息 func SaveUv(dataAll *JyMap, year, month, day, hour int, createTime time.Time) { fmt.Println("SaveUv===", len(dataAll.Data)) var data []UserLogByHour for _, UV := range dataAll.Data { uv := UV.(*UserLogByHour) data = append(data, UserLogByHour{ PositionId: uv.PositionId, Year: int32(year), Month: int32(month), Day: int32(day), Hour: int32(hour), Create_time: createTime.UTC(), Article: uv.Article, Search: uv.Search, Portrait: uv.Portrait, Other: uv.Other, S_source: uv.S_source, BaseUserId: uv.BaseUserId, }) if len(data) == 200 { if _, err := g.DB().Insert(ctx, "user_log_byHour", data); err != nil { log.Println("Insert err", err.Error()) } data = []UserLogByHour{} } } if len(data) > 0 { if _, err := g.DB().Insert(ctx, "user_log_byHour", data); err != nil { log.Println("Insert err", err.Error()) } } }