123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456 |
- package main
- import (
- qu "app.yhyue.com/moapp/jybase/common"
- "app.yhyue.com/moapp/jybase/date"
- "context"
- "fmt"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/util/gconv"
- "log"
- "strings"
- "sync"
- "time"
- )
- type SessionLog struct {
- User_id string
- Session_id string
- Start_time time.Time
- End_time time.Time
- Duration string
- In_platform string
- In_module string
- In_refer string
- In_refer_name string
- In_refer_type string
- In_url string
- In_url_name string
- In_url_element string
- Out_platform string
- Out_module string
- Out_url string
- out_url_name string
- IsActive bool
- Path string
- PathArr []string
- }
- type SessionDetaiLog struct {
- User_id string
- Session_id string
- Create_time time.Time
- Duration string
- Platform string
- Module string
- Count int32
- }
- // UserSessions 用户会话映射
- type UserSessions struct {
- sync.RWMutex
- sessions map[string]*SessionLog
- }
- // UserSessions 用户会话映射
- type UserSessionsDetails struct {
- sync.RWMutex
- sessionsDetails map[string]*SessionDetaiLog
- }
- // Session 获取会话
- func Session(st, et int64) {
- //初始化页码code
- pageCodeMap := LoadPageCode()
- //先加载未结束的会话
- userSessionMap := LoadUserSession(st, et)
- //加载虚拟id
- userTrustedMap := LoadUserTrustedId(st, et)
- //初始化用户会话明细映射
- // 初始化用户会话映射
- userSessionDetails := &UserSessionsDetails{
- sessionsDetails: make(map[string]*SessionDetaiLog),
- }
- modelMap := MatchRegexModule()
- starttime := time.Unix(st, 0).Format(date.Date_Full_Layout)
- endtime := time.Unix(et, 0).Format(date.Date_Full_Layout)
- //加载用户行为表
- q := fmt.Sprintf(`select date,user_id,trusted_id,platform,module,url,url_name,url_element,refer,refer_name,refer_type,session_id,page_name,breaker_name,data_type,action_id
- from userbehavior.user_behavior_log where
- date >='%s' and date <'%s'`, starttime, endtime)
- // and user_id ='67d7e09285045c492114c634'
- ctx := context.Background()
- //log.Println(q)
- rows, err := Ch_userbehavior.Query(ctx, q)
- if err != nil {
- log.Fatal(err)
- }
- defer rows.Close()
- for rows.Next() {
- var (
- date time.Time
- user_id, trusted_id, platform, module, url, url_name, url_element, refer, refer_name, refer_type, session_id, page_name, breaker_name, data_type string
- action_id int64
- )
- if err := rows.Scan(
- &date, &user_id, &trusted_id, &platform, &module, &url, &url_name, &url_element, &refer, &refer_name, &refer_type, &session_id, &page_name, &breaker_name, &data_type, &action_id,
- ); err != nil {
- log.Fatal(err)
- }
- //处理会话逻辑
- HandleSession(pageCodeMap, userSessionDetails, userTrustedMap, userSessionMap, date, user_id, trusted_id, platform, module, url, url_name, url_element, refer, refer_name, refer_type, session_id, page_name, breaker_name, data_type, action_id, modelMap)
- }
- //清理过期session
- CleanupSessions(userSessionMap)
- //存储
- StoreSessions(userSessionMap, userSessionDetails)
- }
- // GetSessionId 获取sessionid
- func GetSessionId(platform, user_id string, date time.Time) string {
- timestr := date.Format("20060102150405")
- return fmt.Sprintf("%s_%s_%s", platform, user_id, timestr)
- }
- // CalculateMinutesDifference 计算两个时间点之间的分钟差
- func CalculateMinutesDifference(t1, t2 time.Time) int {
- // 计算两个时间之间的差值
- duration := t2.Sub(t1)
- // 将时间差转换为分钟
- minutes := int(duration.Minutes())
- return minutes
- }
- func HandleSession(pageCodeMap map[string]int64, sessionDetails *UserSessionsDetails, userTrustedMap map[string]string, sessions *UserSessions, date1 time.Time,
- user_id, trusted_id, platform, module, url, url_name, url_element, refer, refer_name, refer_type, session_id, page_name, breaker_name, data_type string, action_id int64, modelMap *CompiledRegexMap) {
- if user_id == "" && trusted_id == "" {
- return
- }
- if user_id == "" && trusted_id != "" {
- user_id = userTrustedMap[trusted_id]
- }
- if user_id == "" {
- return
- }
- keyname := fmt.Sprintf("%s_%s", platform, user_id)
- sessions.RLock()
- session, exists := sessions.sessions[keyname]
- sessions.RUnlock()
- sessId := GetSessionId(platform, user_id, date1)
- id := getPageCodeId(platform, url, breaker_name, module, url_name, action_id, pageCodeMap, modelMap)
- // 如果用户没有活跃会话,则创建新会话
- if !exists || !session.IsActive {
- patharr := []string{}
- if data_type == "c_jy_open_page" {
- //patharr = GetPathBehavior([]string{}, gconv.String(pageCodeMap[key]))
- patharr = GetPathBehavior([]string{}, gconv.String(id))
- }
- newSession := &SessionLog{
- User_id: user_id,
- Session_id: sessId,
- Start_time: date1,
- End_time: date1,
- Duration: "0",
- In_platform: platform,
- In_module: module,
- In_refer: refer,
- In_refer_name: refer_name,
- In_refer_type: refer_type,
- In_url: url,
- In_url_name: url_name,
- In_url_element: url_element,
- Out_platform: platform,
- Out_module: module,
- Out_url: url,
- out_url_name: url_name,
- IsActive: true,
- PathArr: patharr,
- }
- sessions.Lock()
- sessions.sessions[keyname] = newSession
- sessions.Unlock()
- } else {
- // 计算时间差
- timeDiff := date1.Sub(session.End_time)
- if timeDiff > time.Duration(Sysconfig.TimeOut)*time.Minute {
- patharr := []string{}
- if data_type == "c_jy_open_page" {
- //patharr = GetPathBehavior([]string{}, gconv.String(pageCodeMap[key]))
- patharr = GetPathBehavior([]string{}, gconv.String(id))
- }
- // 如果时间差超过60分钟,创建新会话
- newSession := &SessionLog{
- User_id: user_id,
- Session_id: sessId,
- Start_time: date1,
- End_time: date1,
- Duration: "0",
- In_platform: platform,
- In_module: module,
- In_refer: refer,
- In_refer_name: refer_name,
- In_refer_type: refer_type,
- In_url: url,
- In_url_name: url_name,
- In_url_element: url_element,
- Out_platform: platform,
- Out_module: module,
- Out_url: url,
- out_url_name: url_name,
- IsActive: true,
- PathArr: patharr,
- }
- sessions.Lock()
- sessions.sessions[keyname] = newSession
- sessions.Unlock()
- } else {
- endtime := session.End_time
- if endtime.Before(date1) {
- endtime = date1
- }
- duration := endtime.Sub(session.Start_time)
- // 否则更新当前会话
- sessions.Lock()
- session.End_time = endtime
- session.Out_platform = platform
- session.Out_module = module
- session.Out_url = url
- session.out_url_name = url_name
- session.Duration = duration.String()
- //session.PathArr = append(session.PathArr, gconv.String(pageCodeMap[key]))
- if data_type == "c_jy_open_page" {
- //session.PathArr = GetPathBehavior(session.PathArr, gconv.String(pageCodeMap[key]))
- session.PathArr = GetPathBehavior(session.PathArr, gconv.String(id))
- }
- sessions.Unlock()
- }
- }
- //
- sessdetailkey := fmt.Sprintf("%s_%s", sessions.sessions[keyname].Session_id, module)
- sessionDetails.RLock()
- sessdetal, exists := sessionDetails.sessionsDetails[sessdetailkey]
- sessionDetails.RUnlock()
- if !exists {
- //新增
- newSessDetail := &SessionDetaiLog{
- User_id: user_id,
- Session_id: sessions.sessions[keyname].Session_id,
- Create_time: sessions.sessions[keyname].Start_time,
- Duration: sessions.sessions[keyname].Duration,
- Platform: platform,
- Module: module,
- Count: 1,
- }
- sessionDetails.Lock()
- sessionDetails.sessionsDetails[sessdetailkey] = newSessDetail
- sessionDetails.Unlock()
- } else {
- //更新
- sessionDetails.Lock()
- sessdetal.Count++
- sessdetal.Duration = sessions.sessions[keyname].Duration
- sessionDetails.Unlock()
- }
- }
- // LoadUserSession 加载未结束会话
- func LoadUserSession(st, et int64) *UserSessions {
- // 初始化用户会话映射
- userSessions := &UserSessions{
- sessions: make(map[string]*SessionLog),
- }
- starttime := time.Unix(st, 0).Add(-time.Duration(Sysconfig.TimeOut) * time.Minute).Format(date.Date_Full_Layout)
- endtime := time.Unix(et, 0).Add(-time.Duration(Sysconfig.TimeOut) * time.Minute).Format(date.Date_Full_Layout)
- q := fmt.Sprintf(`select user_id,session_id,start_time,end_time,duration,in_platform,in_subsystem,
- in_module,in_refer,in_refer_name,in_refer_type,in_url,in_url_name,in_url_element,
- out_platform,out_subsystem,out_module,out_url,out_url_name,path
- from userbehavior.user_session_log where end_time >='%s' and end_time <'%s'`, starttime, endtime)
- ctx := context.Background()
- rows, err := Ch_userbehavior.Query(ctx, q)
- if err != nil {
- log.Fatal(err)
- }
- defer rows.Close()
- for rows.Next() {
- var (
- start_time, end_time time.Time
- user_id, session_id, duration, in_platform, in_subsystem, in_module, in_refer, in_refer_name, in_refer_type, in_url, in_url_name, in_url_element, out_platform, out_subsystem, out_module, out_url, out_url_name, path string
- )
- if err := rows.Scan(
- &user_id, &session_id, &start_time, &end_time, &duration, &in_platform, &in_subsystem, &in_module, &in_refer, &in_refer_name, &in_refer_type, &in_url, &in_url_name, &in_url_element, &out_platform, &out_subsystem, &out_module, &out_url, &out_url_name, &path,
- ); err != nil {
- log.Fatal(err)
- }
- if path != "" {
- da := Ch.SelectBySql(`select path from userbehavior.user_behavior_path where id =? limit 1`, path)
- if da != nil && len(*da) > 0 {
- path = qu.ObjToString((*da)[0]["path"])
- }
- }
- userSessions.Lock()
- userSessions.sessions[fmt.Sprintf("%s_%s", in_platform, user_id)] = &SessionLog{
- User_id: user_id,
- Session_id: session_id,
- Start_time: start_time,
- End_time: end_time,
- Duration: duration,
- In_platform: in_platform,
- In_module: in_module,
- In_refer: in_refer,
- In_refer_name: in_refer_name,
- In_refer_type: in_refer_type,
- In_url: in_url,
- In_url_name: in_url_name,
- In_url_element: in_url_element,
- Out_platform: out_platform,
- Out_module: out_module,
- Out_url: out_url,
- out_url_name: out_url_name,
- IsActive: true,
- PathArr: strings.Split(path, "_"),
- }
- userSessions.Unlock()
- }
- return userSessions
- }
- // CleanupSessions 清理过期的会话
- func CleanupSessions(sessions *UserSessions) {
- sessions.RLock()
- defer sessions.RUnlock()
- // 当前时间
- now := time.Now()
- // 遍历并清理过期会话
- for userID, session := range sessions.sessions {
- // 如果会话不活跃,并且最后更新时间超过24小时
- if !session.IsActive && now.Sub(session.End_time) > time.Duration(Sysconfig.TimeOut)*time.Minute {
- sessions.Lock()
- delete(sessions.sessions, userID)
- sessions.Unlock()
- }
- }
- }
- // StoreSessions 存储会话数据到 ClickHouse
- func StoreSessions(sessions *UserSessions, sessionDetails *UserSessionsDetails) {
- sessions.RLock()
- defer sessions.RUnlock()
- ctx := context.Background()
- // 遍历所有用户会话
- c := 0
- for _, session := range sessions.sessions {
- if session.IsActive {
- c++
- if c%1000 == 0 {
- time.Sleep(time.Second * 3)
- }
- //var results
- //Ch_userbehavior.Select(ctx, query)
- count := Ch.CountBySql(`select count(1) from user_session_log where session_id =?`, session.Session_id)
- if count > 0 {
- //修改user_session_log
- query := fmt.Sprintf(`ALTER TABLE user_session_log
- UPDATE out_platform='%s' , out_module ='%s' ,
- out_url ='%s' , out_url_name='%s' ,
- end_time ='%v' , duration ='%s' ,path='%s'
- WHERE session_id = '%s'`, session.Out_platform, session.Out_module, session.Out_url,
- session.out_url_name, session.End_time.Format(date.Date_Full_Layout), session.Duration, gconv.String(SavePathBehavior(strings.Join(session.PathArr, "_"))),
- session.Session_id)
- err := Ch_userbehavior.Exec(ctx, query)
- if err != nil {
- log.Println(session.Session_id, "更新失败", err)
- }
- //修改user_session_detail_log
- } else {
- //不存在会话 创建
- insertData := map[string]interface{}{
- "user_id": session.User_id,
- "session_id": session.Session_id,
- "start_time": session.Start_time,
- "end_time": session.End_time,
- "duration": session.Duration,
- "in_platform": session.In_platform,
- "in_subsystem": "",
- "in_module": session.In_module,
- "in_refer": session.In_refer,
- "in_refer_name": session.In_refer_name,
- "in_refer_type": session.In_refer_type,
- "in_url": session.In_url,
- "in_url_name": session.In_url_name,
- "in_url_element": session.In_url_element,
- "out_platform": session.Out_platform,
- "out_subsystem": "",
- "out_module": session.Out_module,
- "out_url": session.Out_url,
- "out_url_name": session.out_url_name,
- "path": gconv.String(SavePathBehavior(strings.Join(session.PathArr, "_"))),
- }
- _, err := g.DB().Model("userbehavior.user_session_log").Data(insertData).Insert()
- if err != nil {
- log.Println("session_log存储失败", err)
- }
- }
- }
- //修改归集表
- query := fmt.Sprintf(`ALTER TABLE user_behavior_log
- UPDATE session_id='%s' , session_starttime ='%s'
- WHERE date >= '%s' and date <'%s' and user_id ='%s' and platform ='%s'`, session.Session_id, session.Start_time.Format(date.Date_Full_Layout), session.Start_time.Format(date.Date_Full_Layout), session.End_time.Format(date.Date_Full_Layout), session.User_id, session.In_platform)
- err := Ch_userbehavior.Exec(ctx, query)
- if err != nil {
- log.Println("详情更新失败", err)
- }
- }
- sessionDetails.RLock()
- defer sessionDetails.RUnlock()
- c2 := 0
- for _, detail := range sessionDetails.sessionsDetails {
- c2++
- if c2%1000 == 0 {
- time.Sleep(time.Second * 3)
- }
- ddata := Ch.SelectBySql(`select count from user_session_detail_log
- where session_id =? and module = ?`, detail.Session_id, detail.Module)
- if ddata != nil && len(*ddata) > 0 {
- c := qu.IntAll((*ddata)[0]["count"])
- //修改user_session_log
- query := fmt.Sprintf(`ALTER TABLE user_session_detail_log
- UPDATE duration='%s' , count ='%v'
- WHERE session_id = '%s' and module ='%s'`, detail.Duration, detail.Count+int32(c), detail.Session_id, detail.Module)
- err := Ch_userbehavior.Exec(ctx, query)
- if err != nil {
- log.Println(detail.Session_id, "详情更新失败", err)
- }
- } else {
- //不存在会话 创建
- insertData := map[string]interface{}{
- "user_id": detail.User_id,
- "session_id": detail.Session_id,
- "create_time": detail.Create_time,
- "duration": detail.Duration,
- "platform": detail.Platform,
- "subsystem": "",
- "module": detail.Module,
- "count": detail.Count,
- }
- _, err := g.DB().Model("userbehavior.user_session_detail_log").Data(insertData).Insert()
- if err != nil {
- log.Println("user_session_detail_log存储失败", err)
- }
- }
- }
- log.Println("处理数据结束")
- }
|