123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- package util
- import (
- "fmt"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gtime"
- "github.com/gogf/gf/v2/util/gconv"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "go.mongodb.org/mongo-driver/mongo/options"
- "log"
- "new-userlocation/db"
- "regexp"
- "strings"
- "sync"
- "time"
- )
- var (
- th_u = 4
- ArticleReg, SearchReg, PortraitReg *regexp.Regexp //三类正则
- )
- type JyMap struct {
- Lock *sync.Mutex
- Data map[any]any
- }
- func NewJM() *JyMap {
- jm := &JyMap{}
- jm.Lock = &sync.Mutex{}
- jm.Data = map[any]any{}
- return jm
- }
- func (jm *JyMap) Set(k, v any) {
- jm.Lock.Lock()
- jm.Data[k] = v
- jm.Lock.Unlock()
- }
- func (jm *JyMap) Get(k any) (v any) {
- jm.Lock.Lock()
- v = jm.Data[k]
- jm.Lock.Unlock()
- return
- }
- func PositionLog() {
- //从redis中加载时间进行任务
- //开启定时任务,开启日志同步
- th_u = g.Cfg().MustGet(ctx, "th_u", 4).Int()
- regex := g.Cfg().MustGet(ctx, "regex", "").String()
- arr := strings.Split(regex, ",")
- for _, v := range arr {
- av := strings.Split(v, ":")
- if len(av) == 2 {
- req := regexp.MustCompile(av[0])
- switch av[1] {
- case "search":
- SearchReg = req
- case "article":
- ArticleReg = req
- case "portrait":
- PortraitReg = req
- }
- }
- }
- start := GetLastId()
- for {
- now := time.Now().Unix()
- end := start + 3600
- if start >= 1500000000 && now-start >= 7200 {
- AnalyLogByHour(start, end)
- log.Println("end")
- if err := SetLastId(end); err == nil {
- start = end
- }
- } else {
- log.Println("时间有问题", start, now)
- }
- now = time.Now().Unix()
- if (now - end) > 7300 {
- time.Sleep(3 * time.Second)
- } else {
- time.Sleep(2 * time.Minute)
- }
- }
- }
- type (
- UserLogByHour struct {
- PositionId int64 `json:"positionId"`
- Year int32 `json:"year"`
- Month int32 `json:"month"`
- Day int32 `json:"day"`
- Hour int32 `json:"hour"`
- Create_time time.Time `json:"create_Time"`
- Article int64 `json:"article"`
- Search int64 `json:"search"`
- Portrait int64 `json:"portrait"`
- Other int64 `json:"other"`
- S_source string `json:"s_source"`
- BaseUserId int64 `json:"baseUserId"`
- }
- )
- func AnalyLogByHour(start, end int64) {
- log.Println("task run", start, end, gtime.NewFromTimeStamp(start))
- logCh := make(chan map[string]any, 3000)
- //int 转换成id
- th_Uv := NewThreads(th_u)
- go func() {
- ticker := time.NewTicker(30 * time.Minute)
- defer ticker.Stop()
- HourMap := NewJM()
- tn := time.Unix(start, 0)
- year, month, day, hour, createTime := tn.Year(), int(tn.Month()), tn.Day(), tn.Hour(), tn
- L:
- for {
- select {
- case v, ok := <-logCh:
- if !ok {
- log.Println("chan exit")
- break L
- }
- ip := gconv.String(v["ip"])
- ips := strings.Split(ip, ",")
- if len(ips) > 0 && len(ips[0]) > 6 {
- if _, pid, bid := GetPositionId(gconv.String(v["userid"])); pid != 0 {
- //------------------------------
- //按小时处理访问请求,使用正则匹配
- th_Uv.Open()
- go func(v map[string]any, userid, baId int64) {
- defer th_Uv.Close()
- source := GetSource(gconv.String(v["client"]))
- key := fmt.Sprintf("%d_%s", userid, source)
- UV := HourMap.Get(key)
- if UV == nil {
- UV = &UserLogByHour{PositionId: userid}
- HourMap.Set(key, UV)
- }
- url := gconv.String(v["url"])
- uv1 := UV.(*UserLogByHour)
- uv1.S_source = source
- uv1.BaseUserId = baId
- if ArticleReg.MatchString(url) {
- uv1.Article++
- } else if SearchReg.MatchString(url) {
- uv1.Search++
- } else if PortraitReg.MatchString(url) {
- uv1.Portrait++
- } else {
- uv1.Other++
- }
- }(v, pid, bid)
- }
- }
- // case <-time.After(3 * time.Minute):
- case <-ticker.C:
- log.Println("chan exit1")
- break L
- }
- }
- //等待处理完成,保存完成,退出数据
- th_Uv.Wait()
- SaveUv(HourMap, year, month, day, hour, createTime)
- log.Println("task over,exit", gtime.NewFromTimeStamp(start), gtime.NewFromTimeStamp(end))
- }()
- //jy_logs,jyapp_logs,jy_gateway_logs,bigmember_logs,jybxbase_logs
- //按日分析
- //分析不同规则
- //从mongo读取]
- tables := g.Cfg().MustGet(ctx, "log_tables").Strings()
- log.Println("analy tables:", tables)
- th_mgo := NewThreads(len(tables))
- for _, table := range tables {
- th_mgo.Open()
- go func(table string) {
- defer th_mgo.Close()
- of := options.Find()
- of.SetProjection(g.Map{"ip": 1, "url": 1, "userid": 1, "hour": 1, "month": 1, "day": 1, "date": 1, "client": 1})
- coll := db.MG.DB("log").C.Database(db.MG.DB("log").DbName).Collection(table)
- cur, err := coll.Find(ctx, g.Map{"_id": g.Map{"$gte": MongoId(start), "$lt": MongoId(end)}}, of)
- if err == nil && cur.Err() == nil {
- n := 0
- for ; cur.Next(ctx); n++ {
- v := g.Map{}
- e1 := cur.Decode(&v)
- if e1 == nil {
- logCh <- v
- }
- if n%200 == 0 {
- log.Println("current", table, n)
- }
- }
- } else {
- log.Println(err)
- }
- }(table)
- }
- th_mgo.Wait()
- close(logCh)
- }
- func GetSource(client string) string {
- if strings.Contains(strings.ToLower(client), "wechat") {
- return "wx"
- } else if strings.Contains(strings.ToLower(client), "mobile") {
- return "app"
- }
- return "pc"
- }
- func MongoId(t int64) primitive.ObjectID {
- id, _ := primitive.ObjectIDFromHex(fmt.Sprintf("%x0000000000000000", t))
- return id
- }
- // 保存用户每小时的访问信息
- func SaveUv(dataAll *JyMap, year, month, day, hour int, createTime time.Time) {
- fmt.Println("SaveUv===", len(dataAll.Data))
- var data []UserLogByHour
- for _, UV := range dataAll.Data {
- uv := UV.(*UserLogByHour)
- data = append(data, UserLogByHour{
- PositionId: uv.PositionId,
- Year: int32(year),
- Month: int32(month),
- Day: int32(day),
- Hour: int32(hour),
- Create_time: createTime.UTC(),
- Article: uv.Article,
- Search: uv.Search,
- Portrait: uv.Portrait,
- Other: uv.Other,
- S_source: uv.S_source,
- BaseUserId: uv.BaseUserId,
- })
- if len(data) == 200 {
- if _, err := g.DB().Insert(ctx, "user_log_byHour", data); err != nil {
- log.Println("Insert err", err.Error())
- }
- data = []UserLogByHour{}
- }
- }
- if len(data) > 0 {
- if _, err := g.DB().Insert(ctx, "user_log_byHour", data); err != nil {
- log.Println("Insert err", err.Error())
- }
- }
- }
|