session.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. package main
  2. import (
  3. qu "app.yhyue.com/moapp/jybase/common"
  4. "app.yhyue.com/moapp/jybase/date"
  5. "context"
  6. "fmt"
  7. "github.com/gogf/gf/v2/frame/g"
  8. "github.com/gogf/gf/v2/util/gconv"
  9. "log"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. type SessionLog struct {
  15. User_id string
  16. Session_id string
  17. Start_time time.Time
  18. End_time time.Time
  19. Duration string
  20. In_platform string
  21. In_module string
  22. In_refer string
  23. In_refer_name string
  24. In_refer_type string
  25. In_url string
  26. In_url_name string
  27. In_url_element string
  28. Out_platform string
  29. Out_module string
  30. Out_url string
  31. out_url_name string
  32. IsActive bool
  33. Path string
  34. PathArr []string
  35. }
  36. type SessionDetaiLog struct {
  37. User_id string
  38. Session_id string
  39. Create_time time.Time
  40. Duration string
  41. Platform string
  42. Module string
  43. Count int32
  44. }
  45. // UserSessions 用户会话映射
  46. type UserSessions struct {
  47. sync.RWMutex
  48. sessions map[string]*SessionLog
  49. }
  50. // UserSessions 用户会话映射
  51. type UserSessionsDetails struct {
  52. sync.RWMutex
  53. sessionsDetails map[string]*SessionDetaiLog
  54. }
  55. // Session 获取会话
  56. func Session(st, et int64) {
  57. //初始化页码code
  58. pageCodeMap := LoadPageCode()
  59. //先加载未结束的会话
  60. userSessionMap := LoadUserSession(st, et)
  61. //加载虚拟id
  62. userTrustedMap := LoadUserTrustedId(st, et)
  63. //初始化用户会话明细映射
  64. // 初始化用户会话映射
  65. userSessionDetails := &UserSessionsDetails{
  66. sessionsDetails: make(map[string]*SessionDetaiLog),
  67. }
  68. modelMap := MatchRegexModule()
  69. starttime := time.Unix(st, 0).Format(date.Date_Full_Layout)
  70. endtime := time.Unix(et, 0).Format(date.Date_Full_Layout)
  71. //加载用户行为表
  72. q := fmt.Sprintf(`select date,user_id,trusted_id,platform,module,url,url_name,url_element,refer,refer_name,refer_type,session_id,page_name,breaker_name,data_type,action_id
  73. from userbehavior.user_behavior_log where
  74. date >='%s' and date <'%s'`, starttime, endtime)
  75. // and user_id ='67d7e09285045c492114c634'
  76. ctx := context.Background()
  77. //log.Println(q)
  78. rows, err := Ch_userbehavior.Query(ctx, q)
  79. if err != nil {
  80. log.Fatal(err)
  81. }
  82. defer rows.Close()
  83. for rows.Next() {
  84. var (
  85. date time.Time
  86. user_id, trusted_id, platform, module, url, url_name, url_element, refer, refer_name, refer_type, session_id, page_name, breaker_name, data_type string
  87. action_id int64
  88. )
  89. if err := rows.Scan(
  90. &date, &user_id, &trusted_id, &platform, &module, &url, &url_name, &url_element, &refer, &refer_name, &refer_type, &session_id, &page_name, &breaker_name, &data_type, &action_id,
  91. ); err != nil {
  92. log.Fatal(err)
  93. }
  94. //处理会话逻辑
  95. HandleSession(pageCodeMap, userSessionDetails, userTrustedMap, userSessionMap, date, user_id, trusted_id, platform, module, url, url_name, url_element, refer, refer_name, refer_type, session_id, page_name, breaker_name, data_type, action_id, modelMap)
  96. }
  97. //清理过期session
  98. CleanupSessions(userSessionMap)
  99. //存储
  100. StoreSessions(userSessionMap, userSessionDetails)
  101. }
  102. // GetSessionId 获取sessionid
  103. func GetSessionId(platform, user_id string, date time.Time) string {
  104. timestr := date.Format("20060102150405")
  105. return fmt.Sprintf("%s_%s_%s", platform, user_id, timestr)
  106. }
  107. // CalculateMinutesDifference 计算两个时间点之间的分钟差
  108. func CalculateMinutesDifference(t1, t2 time.Time) int {
  109. // 计算两个时间之间的差值
  110. duration := t2.Sub(t1)
  111. // 将时间差转换为分钟
  112. minutes := int(duration.Minutes())
  113. return minutes
  114. }
  115. func HandleSession(pageCodeMap map[string]int64, sessionDetails *UserSessionsDetails, userTrustedMap map[string]string, sessions *UserSessions, date1 time.Time,
  116. user_id, trusted_id, platform, module, url, url_name, url_element, refer, refer_name, refer_type, session_id, page_name, breaker_name, data_type string, action_id int64, modelMap *CompiledRegexMap) {
  117. if user_id == "" && trusted_id == "" {
  118. return
  119. }
  120. if user_id == "" && trusted_id != "" {
  121. user_id = userTrustedMap[trusted_id]
  122. }
  123. if user_id == "" {
  124. return
  125. }
  126. keyname := fmt.Sprintf("%s_%s", platform, user_id)
  127. sessions.RLock()
  128. session, exists := sessions.sessions[keyname]
  129. sessions.RUnlock()
  130. sessId := GetSessionId(platform, user_id, date1)
  131. id := getPageCodeId(platform, url, breaker_name, module, url_name, action_id, pageCodeMap, modelMap)
  132. // 如果用户没有活跃会话,则创建新会话
  133. if !exists || !session.IsActive {
  134. patharr := []string{}
  135. if data_type == "c_jy_open_page" {
  136. //patharr = GetPathBehavior([]string{}, gconv.String(pageCodeMap[key]))
  137. patharr = GetPathBehavior([]string{}, gconv.String(id))
  138. }
  139. newSession := &SessionLog{
  140. User_id: user_id,
  141. Session_id: sessId,
  142. Start_time: date1,
  143. End_time: date1,
  144. Duration: "0",
  145. In_platform: platform,
  146. In_module: module,
  147. In_refer: refer,
  148. In_refer_name: refer_name,
  149. In_refer_type: refer_type,
  150. In_url: url,
  151. In_url_name: url_name,
  152. In_url_element: url_element,
  153. Out_platform: platform,
  154. Out_module: module,
  155. Out_url: url,
  156. out_url_name: url_name,
  157. IsActive: true,
  158. PathArr: patharr,
  159. }
  160. sessions.Lock()
  161. sessions.sessions[keyname] = newSession
  162. sessions.Unlock()
  163. } else {
  164. // 计算时间差
  165. timeDiff := date1.Sub(session.End_time)
  166. if timeDiff > time.Duration(Sysconfig.TimeOut)*time.Minute {
  167. patharr := []string{}
  168. if data_type == "c_jy_open_page" {
  169. //patharr = GetPathBehavior([]string{}, gconv.String(pageCodeMap[key]))
  170. patharr = GetPathBehavior([]string{}, gconv.String(id))
  171. }
  172. // 如果时间差超过60分钟,创建新会话
  173. newSession := &SessionLog{
  174. User_id: user_id,
  175. Session_id: sessId,
  176. Start_time: date1,
  177. End_time: date1,
  178. Duration: "0",
  179. In_platform: platform,
  180. In_module: module,
  181. In_refer: refer,
  182. In_refer_name: refer_name,
  183. In_refer_type: refer_type,
  184. In_url: url,
  185. In_url_name: url_name,
  186. In_url_element: url_element,
  187. Out_platform: platform,
  188. Out_module: module,
  189. Out_url: url,
  190. out_url_name: url_name,
  191. IsActive: true,
  192. PathArr: patharr,
  193. }
  194. sessions.Lock()
  195. sessions.sessions[keyname] = newSession
  196. sessions.Unlock()
  197. } else {
  198. endtime := session.End_time
  199. if endtime.Before(date1) {
  200. endtime = date1
  201. }
  202. duration := endtime.Sub(session.Start_time)
  203. // 否则更新当前会话
  204. sessions.Lock()
  205. session.End_time = endtime
  206. session.Out_platform = platform
  207. session.Out_module = module
  208. session.Out_url = url
  209. session.out_url_name = url_name
  210. session.Duration = duration.String()
  211. //session.PathArr = append(session.PathArr, gconv.String(pageCodeMap[key]))
  212. if data_type == "c_jy_open_page" {
  213. //session.PathArr = GetPathBehavior(session.PathArr, gconv.String(pageCodeMap[key]))
  214. session.PathArr = GetPathBehavior(session.PathArr, gconv.String(id))
  215. }
  216. sessions.Unlock()
  217. }
  218. }
  219. //
  220. sessdetailkey := fmt.Sprintf("%s_%s", sessions.sessions[keyname].Session_id, module)
  221. sessionDetails.RLock()
  222. sessdetal, exists := sessionDetails.sessionsDetails[sessdetailkey]
  223. sessionDetails.RUnlock()
  224. if !exists {
  225. //新增
  226. newSessDetail := &SessionDetaiLog{
  227. User_id: user_id,
  228. Session_id: sessions.sessions[keyname].Session_id,
  229. Create_time: sessions.sessions[keyname].Start_time,
  230. Duration: sessions.sessions[keyname].Duration,
  231. Platform: platform,
  232. Module: module,
  233. Count: 1,
  234. }
  235. sessionDetails.Lock()
  236. sessionDetails.sessionsDetails[sessdetailkey] = newSessDetail
  237. sessionDetails.Unlock()
  238. } else {
  239. //更新
  240. sessionDetails.Lock()
  241. sessdetal.Count++
  242. sessdetal.Duration = sessions.sessions[keyname].Duration
  243. sessionDetails.Unlock()
  244. }
  245. }
  246. // LoadUserSession 加载未结束会话
  247. func LoadUserSession(st, et int64) *UserSessions {
  248. // 初始化用户会话映射
  249. userSessions := &UserSessions{
  250. sessions: make(map[string]*SessionLog),
  251. }
  252. starttime := time.Unix(st, 0).Add(-time.Duration(Sysconfig.TimeOut) * time.Minute).Format(date.Date_Full_Layout)
  253. endtime := time.Unix(et, 0).Add(-time.Duration(Sysconfig.TimeOut) * time.Minute).Format(date.Date_Full_Layout)
  254. q := fmt.Sprintf(`select user_id,session_id,start_time,end_time,duration,in_platform,in_subsystem,
  255. in_module,in_refer,in_refer_name,in_refer_type,in_url,in_url_name,in_url_element,
  256. out_platform,out_subsystem,out_module,out_url,out_url_name,path
  257. from userbehavior.user_session_log where end_time >='%s' and end_time <'%s'`, starttime, endtime)
  258. ctx := context.Background()
  259. rows, err := Ch_userbehavior.Query(ctx, q)
  260. if err != nil {
  261. log.Fatal(err)
  262. }
  263. defer rows.Close()
  264. for rows.Next() {
  265. var (
  266. start_time, end_time time.Time
  267. user_id, session_id, duration, in_platform, in_subsystem, in_module, in_refer, in_refer_name, in_refer_type, in_url, in_url_name, in_url_element, out_platform, out_subsystem, out_module, out_url, out_url_name, path string
  268. )
  269. if err := rows.Scan(
  270. &user_id, &session_id, &start_time, &end_time, &duration, &in_platform, &in_subsystem, &in_module, &in_refer, &in_refer_name, &in_refer_type, &in_url, &in_url_name, &in_url_element, &out_platform, &out_subsystem, &out_module, &out_url, &out_url_name, &path,
  271. ); err != nil {
  272. log.Fatal(err)
  273. }
  274. if path != "" {
  275. da := Ch.SelectBySql(`select path from userbehavior.user_behavior_path where id =? limit 1`, path)
  276. if da != nil && len(*da) > 0 {
  277. path = qu.ObjToString((*da)[0]["path"])
  278. }
  279. }
  280. userSessions.Lock()
  281. userSessions.sessions[fmt.Sprintf("%s_%s", in_platform, user_id)] = &SessionLog{
  282. User_id: user_id,
  283. Session_id: session_id,
  284. Start_time: start_time,
  285. End_time: end_time,
  286. Duration: duration,
  287. In_platform: in_platform,
  288. In_module: in_module,
  289. In_refer: in_refer,
  290. In_refer_name: in_refer_name,
  291. In_refer_type: in_refer_type,
  292. In_url: in_url,
  293. In_url_name: in_url_name,
  294. In_url_element: in_url_element,
  295. Out_platform: out_platform,
  296. Out_module: out_module,
  297. Out_url: out_url,
  298. out_url_name: out_url_name,
  299. IsActive: true,
  300. PathArr: strings.Split(path, "_"),
  301. }
  302. userSessions.Unlock()
  303. }
  304. return userSessions
  305. }
  306. // CleanupSessions 清理过期的会话
  307. func CleanupSessions(sessions *UserSessions) {
  308. sessions.RLock()
  309. defer sessions.RUnlock()
  310. // 当前时间
  311. now := time.Now()
  312. // 遍历并清理过期会话
  313. for userID, session := range sessions.sessions {
  314. // 如果会话不活跃,并且最后更新时间超过24小时
  315. if !session.IsActive && now.Sub(session.End_time) > time.Duration(Sysconfig.TimeOut)*time.Minute {
  316. sessions.Lock()
  317. delete(sessions.sessions, userID)
  318. sessions.Unlock()
  319. }
  320. }
  321. }
  322. // StoreSessions 存储会话数据到 ClickHouse
  323. func StoreSessions(sessions *UserSessions, sessionDetails *UserSessionsDetails) {
  324. sessions.RLock()
  325. defer sessions.RUnlock()
  326. ctx := context.Background()
  327. // 遍历所有用户会话
  328. c := 0
  329. for _, session := range sessions.sessions {
  330. if session.IsActive {
  331. c++
  332. if c%1000 == 0 {
  333. time.Sleep(time.Second * 3)
  334. }
  335. //var results
  336. //Ch_userbehavior.Select(ctx, query)
  337. count := Ch.CountBySql(`select count(1) from user_session_log where session_id =?`, session.Session_id)
  338. if count > 0 {
  339. //修改user_session_log
  340. query := fmt.Sprintf(`ALTER TABLE user_session_log
  341. UPDATE out_platform='%s' , out_module ='%s' ,
  342. out_url ='%s' , out_url_name='%s' ,
  343. end_time ='%v' , duration ='%s' ,path='%s'
  344. WHERE session_id = '%s'`, session.Out_platform, session.Out_module, session.Out_url,
  345. session.out_url_name, session.End_time.Format(date.Date_Full_Layout), session.Duration, gconv.String(SavePathBehavior(strings.Join(session.PathArr, "_"))),
  346. session.Session_id)
  347. err := Ch_userbehavior.Exec(ctx, query)
  348. if err != nil {
  349. log.Println(session.Session_id, "更新失败", err)
  350. }
  351. //修改user_session_detail_log
  352. } else {
  353. //不存在会话 创建
  354. insertData := map[string]interface{}{
  355. "user_id": session.User_id,
  356. "session_id": session.Session_id,
  357. "start_time": session.Start_time,
  358. "end_time": session.End_time,
  359. "duration": session.Duration,
  360. "in_platform": session.In_platform,
  361. "in_subsystem": "",
  362. "in_module": session.In_module,
  363. "in_refer": session.In_refer,
  364. "in_refer_name": session.In_refer_name,
  365. "in_refer_type": session.In_refer_type,
  366. "in_url": session.In_url,
  367. "in_url_name": session.In_url_name,
  368. "in_url_element": session.In_url_element,
  369. "out_platform": session.Out_platform,
  370. "out_subsystem": "",
  371. "out_module": session.Out_module,
  372. "out_url": session.Out_url,
  373. "out_url_name": session.out_url_name,
  374. "path": gconv.String(SavePathBehavior(strings.Join(session.PathArr, "_"))),
  375. }
  376. _, err := g.DB().Model("userbehavior.user_session_log").Data(insertData).Insert()
  377. if err != nil {
  378. log.Println("session_log存储失败", err)
  379. }
  380. }
  381. }
  382. //修改归集表
  383. query := fmt.Sprintf(`ALTER TABLE user_behavior_log
  384. UPDATE session_id='%s' , session_starttime ='%s'
  385. WHERE date >= '%s' and date <'%s' and user_id ='%s' and platform ='%s'`, session.Session_id, session.Start_time.Format(date.Date_Full_Layout), session.Start_time.Format(date.Date_Full_Layout), session.End_time.Format(date.Date_Full_Layout), session.User_id, session.In_platform)
  386. err := Ch_userbehavior.Exec(ctx, query)
  387. if err != nil {
  388. log.Println("详情更新失败", err)
  389. }
  390. }
  391. sessionDetails.RLock()
  392. defer sessionDetails.RUnlock()
  393. c2 := 0
  394. for _, detail := range sessionDetails.sessionsDetails {
  395. c2++
  396. if c2%1000 == 0 {
  397. time.Sleep(time.Second * 3)
  398. }
  399. ddata := Ch.SelectBySql(`select count from user_session_detail_log
  400. where session_id =? and module = ?`, detail.Session_id, detail.Module)
  401. if ddata != nil && len(*ddata) > 0 {
  402. c := qu.IntAll((*ddata)[0]["count"])
  403. //修改user_session_log
  404. query := fmt.Sprintf(`ALTER TABLE user_session_detail_log
  405. UPDATE duration='%s' , count ='%v'
  406. WHERE session_id = '%s' and module ='%s'`, detail.Duration, detail.Count+int32(c), detail.Session_id, detail.Module)
  407. err := Ch_userbehavior.Exec(ctx, query)
  408. if err != nil {
  409. log.Println(detail.Session_id, "详情更新失败", err)
  410. }
  411. } else {
  412. //不存在会话 创建
  413. insertData := map[string]interface{}{
  414. "user_id": detail.User_id,
  415. "session_id": detail.Session_id,
  416. "create_time": detail.Create_time,
  417. "duration": detail.Duration,
  418. "platform": detail.Platform,
  419. "subsystem": "",
  420. "module": detail.Module,
  421. "count": detail.Count,
  422. }
  423. _, err := g.DB().Model("userbehavior.user_session_detail_log").Data(insertData).Insert()
  424. if err != nil {
  425. log.Println("user_session_detail_log存储失败", err)
  426. }
  427. }
  428. }
  429. log.Println("处理数据结束")
  430. }