task.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package util
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/frame/g"
  5. "github.com/gogf/gf/v2/os/gtime"
  6. "github.com/gogf/gf/v2/util/gconv"
  7. "go.mongodb.org/mongo-driver/bson/primitive"
  8. "go.mongodb.org/mongo-driver/mongo/options"
  9. "log"
  10. "new-userlocation/db"
  11. "regexp"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. var (
  17. th_u = 4
  18. ArticleReg, SearchReg, PortraitReg *regexp.Regexp //三类正则
  19. )
  20. type JyMap struct {
  21. Lock *sync.Mutex
  22. Data map[any]any
  23. }
  24. func NewJM() *JyMap {
  25. jm := &JyMap{}
  26. jm.Lock = &sync.Mutex{}
  27. jm.Data = map[any]any{}
  28. return jm
  29. }
  30. func (jm *JyMap) Set(k, v any) {
  31. jm.Lock.Lock()
  32. jm.Data[k] = v
  33. jm.Lock.Unlock()
  34. }
  35. func (jm *JyMap) Get(k any) (v any) {
  36. jm.Lock.Lock()
  37. v = jm.Data[k]
  38. jm.Lock.Unlock()
  39. return
  40. }
  41. func PositionLog() {
  42. //从redis中加载时间进行任务
  43. //开启定时任务,开启日志同步
  44. th_u = g.Cfg().MustGet(ctx, "th_u", 4).Int()
  45. regex := g.Cfg().MustGet(ctx, "regex", "").String()
  46. arr := strings.Split(regex, ",")
  47. for _, v := range arr {
  48. av := strings.Split(v, ":")
  49. if len(av) == 2 {
  50. req := regexp.MustCompile(av[0])
  51. switch av[1] {
  52. case "search":
  53. SearchReg = req
  54. case "article":
  55. ArticleReg = req
  56. case "portrait":
  57. PortraitReg = req
  58. }
  59. }
  60. }
  61. start := GetLastId()
  62. for {
  63. now := time.Now().Unix()
  64. end := start + 3600
  65. if start >= 1500000000 && now-start >= 7200 {
  66. AnalyLogByHour(start, end)
  67. log.Println("end")
  68. if err := SetLastId(end); err == nil {
  69. start = end
  70. }
  71. } else {
  72. log.Println("时间有问题", start, now)
  73. }
  74. now = time.Now().Unix()
  75. if (now - end) > 7300 {
  76. time.Sleep(3 * time.Second)
  77. } else {
  78. time.Sleep(2 * time.Minute)
  79. }
  80. }
  81. }
  82. type (
  83. UserLogByHour struct {
  84. PositionId int64 `json:"positionId"`
  85. Year int32 `json:"year"`
  86. Month int32 `json:"month"`
  87. Day int32 `json:"day"`
  88. Hour int32 `json:"hour"`
  89. Create_time time.Time `json:"create_Time"`
  90. Article int64 `json:"article"`
  91. Search int64 `json:"search"`
  92. Portrait int64 `json:"portrait"`
  93. Other int64 `json:"other"`
  94. S_source string `json:"s_source"`
  95. BaseUserId int64 `json:"baseUserId"`
  96. }
  97. )
  98. func AnalyLogByHour(start, end int64) {
  99. log.Println("task run", start, end, gtime.NewFromTimeStamp(start))
  100. logCh := make(chan map[string]any, 3000)
  101. //int 转换成id
  102. th_Uv := NewThreads(th_u)
  103. go func() {
  104. ticker := time.NewTicker(30 * time.Minute)
  105. defer ticker.Stop()
  106. HourMap := NewJM()
  107. tn := time.Unix(start, 0)
  108. year, month, day, hour, createTime := tn.Year(), int(tn.Month()), tn.Day(), tn.Hour(), tn
  109. L:
  110. for {
  111. select {
  112. case v, ok := <-logCh:
  113. if !ok {
  114. log.Println("chan exit")
  115. break L
  116. }
  117. ip := gconv.String(v["ip"])
  118. ips := strings.Split(ip, ",")
  119. if len(ips) > 0 && len(ips[0]) > 6 {
  120. if _, pid, bid := GetPositionId(gconv.String(v["userid"])); pid != 0 {
  121. //------------------------------
  122. //按小时处理访问请求,使用正则匹配
  123. th_Uv.Open()
  124. go func(v map[string]any, userid, baId int64) {
  125. defer th_Uv.Close()
  126. source := GetSource(gconv.String(v["client"]))
  127. key := fmt.Sprintf("%d_%s", userid, source)
  128. UV := HourMap.Get(key)
  129. if UV == nil {
  130. UV = &UserLogByHour{PositionId: userid}
  131. HourMap.Set(key, UV)
  132. }
  133. url := gconv.String(v["url"])
  134. uv1 := UV.(*UserLogByHour)
  135. uv1.S_source = source
  136. uv1.BaseUserId = baId
  137. if ArticleReg.MatchString(url) {
  138. uv1.Article++
  139. } else if SearchReg.MatchString(url) {
  140. uv1.Search++
  141. } else if PortraitReg.MatchString(url) {
  142. uv1.Portrait++
  143. } else {
  144. uv1.Other++
  145. }
  146. }(v, pid, bid)
  147. }
  148. }
  149. // case <-time.After(3 * time.Minute):
  150. case <-ticker.C:
  151. log.Println("chan exit1")
  152. break L
  153. }
  154. }
  155. //等待处理完成,保存完成,退出数据
  156. th_Uv.Wait()
  157. SaveUv(HourMap, year, month, day, hour, createTime)
  158. log.Println("task over,exit", gtime.NewFromTimeStamp(start), gtime.NewFromTimeStamp(end))
  159. }()
  160. //jy_logs,jyapp_logs,jy_gateway_logs,bigmember_logs,jybxbase_logs
  161. //按日分析
  162. //分析不同规则
  163. //从mongo读取]
  164. tables := g.Cfg().MustGet(ctx, "log_tables").Strings()
  165. log.Println("analy tables:", tables)
  166. th_mgo := NewThreads(len(tables))
  167. for _, table := range tables {
  168. th_mgo.Open()
  169. go func(table string) {
  170. defer th_mgo.Close()
  171. of := options.Find()
  172. of.SetProjection(g.Map{"ip": 1, "url": 1, "userid": 1, "hour": 1, "month": 1, "day": 1, "date": 1, "client": 1})
  173. coll := db.MG.DB("log").C.Database(db.MG.DB("log").DbName).Collection(table)
  174. cur, err := coll.Find(ctx, g.Map{"_id": g.Map{"$gte": MongoId(start), "$lt": MongoId(end)}}, of)
  175. if err == nil && cur.Err() == nil {
  176. n := 0
  177. for ; cur.Next(ctx); n++ {
  178. v := g.Map{}
  179. e1 := cur.Decode(&v)
  180. if e1 == nil {
  181. logCh <- v
  182. }
  183. if n%200 == 0 {
  184. log.Println("current", table, n)
  185. }
  186. }
  187. } else {
  188. log.Println(err)
  189. }
  190. }(table)
  191. }
  192. th_mgo.Wait()
  193. close(logCh)
  194. }
  195. func GetSource(client string) string {
  196. if strings.Contains(strings.ToLower(client), "wechat") {
  197. return "wx"
  198. } else if strings.Contains(strings.ToLower(client), "mobile") {
  199. return "app"
  200. }
  201. return "pc"
  202. }
  203. func MongoId(t int64) primitive.ObjectID {
  204. id, _ := primitive.ObjectIDFromHex(fmt.Sprintf("%x0000000000000000", t))
  205. return id
  206. }
  207. // 保存用户每小时的访问信息
  208. func SaveUv(dataAll *JyMap, year, month, day, hour int, createTime time.Time) {
  209. fmt.Println("SaveUv===", len(dataAll.Data))
  210. var data []UserLogByHour
  211. for _, UV := range dataAll.Data {
  212. uv := UV.(*UserLogByHour)
  213. data = append(data, UserLogByHour{
  214. PositionId: uv.PositionId,
  215. Year: int32(year),
  216. Month: int32(month),
  217. Day: int32(day),
  218. Hour: int32(hour),
  219. Create_time: createTime.UTC(),
  220. Article: uv.Article,
  221. Search: uv.Search,
  222. Portrait: uv.Portrait,
  223. Other: uv.Other,
  224. S_source: uv.S_source,
  225. BaseUserId: uv.BaseUserId,
  226. })
  227. if len(data) == 200 {
  228. if _, err := g.DB().Insert(ctx, "user_log_byHour", data); err != nil {
  229. log.Println("Insert err", err.Error())
  230. }
  231. data = []UserLogByHour{}
  232. }
  233. }
  234. if len(data) > 0 {
  235. if _, err := g.DB().Insert(ctx, "user_log_byHour", data); err != nil {
  236. log.Println("Insert err", err.Error())
  237. }
  238. }
  239. }