old.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package entity
  2. import (
  3. "log"
  4. "regexp"
  5. "telemarketingEtl/config"
  6. "telemarketingEtl/util"
  7. "time"
  8. "app.yhyue.com/moapp/jybase/date"
  9. "app.yhyue.com/moapp/jybase/mongodb"
  10. "github.com/gogf/gf/v2/frame/g"
  11. "github.com/gogf/gf/v2/os/gtime"
  12. "github.com/gogf/gf/v2/util/gconv"
  13. )
  14. // 查看事件
  15. func VisitInfoAddOld(start, end int64) {
  16. // 获取时间戳 1 和时间戳 2 所在的日期
  17. t1 := time.Unix(start, 0)
  18. t2 := time.Unix(end, 0)
  19. index := g.Cfg().MustGet(ctx, "index").Int()
  20. RegWx, _ := regexp.Compile("MicroMessenger")
  21. // 获取时间戳 1 和时间戳 2 之间的天数
  22. days := int(t2.Sub(t1).Hours() / 24)
  23. for i := 0; i <= days; i++ {
  24. // 获取当前日期
  25. t := t1.AddDate(0, 0, i)
  26. // 获取当天的开始时间
  27. startOfDay := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()).Unix()
  28. // 获取当天的结束时间
  29. endOfDay := time.Date(t.Year(), t.Month(), t.Day(), 23, 59, 59, 0, t.Location()).Unix()
  30. s_id := util.GetObjectId(startOfDay)
  31. e_id := util.GetObjectId(endOfDay)
  32. query := map[string]interface{}{
  33. "_id": map[string]interface{}{
  34. "$gte": s_id,
  35. "$lte": e_id,
  36. },
  37. }
  38. log.Println("task run", time.Unix(startOfDay, 0).Format(date.Date_Full_Layout), time.Unix(endOfDay, 0).Format(date.Date_Full_Layout), gtime.NewFromTimeStamp(start), query)
  39. tables := g.Cfg().MustGet(ctx, "logTables").Strings()
  40. log.Println("analy tables:", tables)
  41. userVisit := map[string]*visitStruct{}
  42. for _, table := range tables {
  43. func(table string) {
  44. session := config.MgoLog.GetMgoConn()
  45. iter := session.DB("qfw").C(table).Find(query).Iter()
  46. count := 0
  47. for thisData := map[string]interface{}{}; iter.Next(&thisData); {
  48. func(thisData map[string]interface{}) {
  49. userid := gconv.String(thisData["userid"])
  50. if userid == "" {
  51. return
  52. }
  53. if !mongodb.IsObjectIdHex(userid) {
  54. userid, _ = GetUserIdByPositionId(userid)
  55. }
  56. if userid == "" {
  57. return
  58. }
  59. url_ := gconv.String(thisData["url"])
  60. reg := regexp.MustCompile(".*article/content/(.*)\\.html")
  61. contentnum := 0
  62. if reg.MatchString(url_) {
  63. contentnum = 1
  64. }
  65. createtime := gconv.Int64(thisData["date"])
  66. craetetimeStr := time.Unix(createtime, 0).Format(date.Date_Full_Layout)
  67. platform := APP
  68. if table == "jy_logs" {
  69. platform = PC
  70. if client := gconv.String(thisData["client"]); client != "" {
  71. if RegWx.MatchString(client) {
  72. platform = WX
  73. }
  74. }
  75. }
  76. userVisit[userid] = updateUser(userVisit[userid], 1, contentnum, platform, userid, craetetimeStr, craetetimeStr)
  77. //
  78. }(thisData)
  79. count++
  80. if count%5000 == 0 {
  81. log.Printf("%s已完成%d条数据\n", table, count)
  82. }
  83. thisData = map[string]interface{}{}
  84. }
  85. log.Println("end!")
  86. // sWait.Wait()
  87. }(table)
  88. }
  89. fieids := []string{"userid", "DATE", "number", "platform", "createtime", "contentnum"}
  90. args := []interface{}{}
  91. ii := 0
  92. if len(userVisit) > 0 {
  93. log.Println("用户量:", len(userVisit))
  94. for _, v := range userVisit {
  95. userid := (*v).userid
  96. contentnum := (*v).contentnum
  97. number := (*v).number
  98. createtime := (*v).createtime
  99. datetime := (*v).datetime
  100. platform := (*v).platform
  101. args = append(args, userid, datetime, number, platform, createtime, contentnum)
  102. ii++
  103. if ii%index == 0 {
  104. if len(args) > 0 {
  105. config.JianyuSubjectdb.InsertBatch("dwd_f_userbase_visit_info", fieids, args)
  106. args = []interface{}{}
  107. }
  108. }
  109. }
  110. if len(args) > 0 {
  111. config.JianyuSubjectdb.InsertBatch("dwd_f_userbase_visit_info", fieids, args)
  112. }
  113. }
  114. }
  115. }
  116. type visitStruct struct {
  117. userid string
  118. createtime string //创建时间
  119. datetime string //访问时间
  120. platform int
  121. number int
  122. contentnum int
  123. }
  124. func updateUser(visit *visitStruct, num, contmun, platform int, userid, createtime, datetime string) *visitStruct {
  125. if visit == nil {
  126. return &visitStruct{
  127. userid: userid,
  128. createtime: createtime,
  129. datetime: datetime,
  130. number: num,
  131. contentnum: contmun,
  132. platform: platform,
  133. }
  134. } else {
  135. if num > 0 {
  136. visit.number = visit.number + num
  137. }
  138. if contmun > 0 {
  139. visit.contentnum = visit.contentnum + contmun
  140. }
  141. visit.platform = platform
  142. visit.datetime = datetime
  143. }
  144. return visit
  145. }