aheadManager.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package mananger
  2. import (
  3. "app.yhyue.com/moapp/jybase/redis"
  4. "fmt"
  5. "github.com/robfig/cron/v3"
  6. "leadGeneration/entity/activeUsers"
  7. "leadGeneration/entity/power"
  8. "leadGeneration/entity/search"
  9. "leadGeneration/public"
  10. "leadGeneration/vars"
  11. "log"
  12. "sort"
  13. "sync"
  14. "time"
  15. )
  16. // AheadManager 超前项目管理
  17. type AheadManager struct {
  18. Conf vars.AheadConfig
  19. UserGroup map[string]int
  20. BatchFlag string
  21. sync.RWMutex
  22. }
  23. const (
  24. //缓存
  25. AheadCacheDb = "newother"
  26. AheadRequestFrequencyCacheKey = "leadGeneration_AheadRequest_%s_%d_%s"
  27. AheadRequestTimesLong = 60 * 60 * 24 //缓存一天
  28. )
  29. // InitAheadManager 初始化
  30. func InitAheadManager(conf vars.AheadConfig) *AheadManager {
  31. manager := &AheadManager{
  32. Conf: conf,
  33. UserGroup: make(map[string]int),
  34. }
  35. go manager.ScheduledTasks()
  36. return manager
  37. }
  38. // GetData 获取查询数据
  39. func (this *AheadManager) GetData(userId, keyWords string, isNew bool, t int64) map[string]interface{} {
  40. if !(isNew || this.checkGroupUser(userId)) {
  41. return nil
  42. }
  43. //查询是否是有超前项目权限
  44. if power.HasAheadPower(userId) {
  45. return nil
  46. }
  47. //校验每日请求次数
  48. cacheKey := fmt.Sprintf(AheadRequestFrequencyCacheKey, time.Now().Format(public.TimeFormat), t, userId)
  49. if this.Conf.DailyTimes <= redis.GetInt(AheadCacheDb, cacheKey) {
  50. return nil
  51. }
  52. //查询数据
  53. rDate := search.AdvancedProject(userId, keyWords, t)
  54. //累计请求计数
  55. if rDate != nil && len(rDate) > 0 {
  56. if num := redis.Incr(AheadCacheDb, cacheKey); num == 1 {
  57. _ = redis.SetExpire(AheadCacheDb, cacheKey, AheadRequestTimesLong)
  58. }
  59. }
  60. return rDate
  61. }
  62. // Click 用户点击
  63. func (this *AheadManager) Click(userId string) bool {
  64. this.Lock()
  65. defer this.Unlock()
  66. if v, ok := this.UserGroup[userId]; ok {
  67. this.UserGroup[userId] = v
  68. return true
  69. }
  70. return false
  71. }
  72. // CheckGroupUser 校验用户是否有资格展示
  73. func (this *AheadManager) checkGroupUser(userId string) (exists bool) {
  74. this.RLock()
  75. defer this.RUnlock()
  76. _, exists = this.UserGroup[userId]
  77. return
  78. }
  79. // ScheduledTasks 定时任务
  80. func (this *AheadManager) ScheduledTasks() {
  81. if this.Conf.UpdateCron != "" {
  82. c := cron.New(cron.WithSeconds())
  83. // 给对象增加定时任务
  84. if _, err := c.AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil {
  85. panic(err)
  86. }
  87. c.Start()
  88. }
  89. //首次运行圈选用户
  90. go this.UpdateUserGroupJob()
  91. }
  92. // UpdateUserGroupJob 更新用户群组
  93. func (this *AheadManager) UpdateUserGroupJob() {
  94. if this.Conf.Prop <= 0 {
  95. return
  96. }
  97. log.Printf("[MANAGER-INFO]AheadManager UserGroup Change Start\n")
  98. this.BatchFlag = public.GetWeekBatchName(time.Now())
  99. newMap := map[string]int{}
  100. //新圈用户
  101. for _, uId := range this.getUserGroup() {
  102. newMap[uId] = 0
  103. }
  104. this.Lock()
  105. defer this.Unlock()
  106. this.UserGroup = newMap
  107. log.Printf("[MANAGER-INFO]AheadManager UserGroup Changed Finish Total is %d \n", len(this.UserGroup))
  108. }
  109. // getUserGroup 取用户
  110. func (this *AheadManager) getUserGroup() (userIds []string) {
  111. //当前批次是否已有数据
  112. rData := public.UserAnalyseDb.SelectBySql("SELECT user_mongoid AS uid FROM user_leadGeneration_group WHERE group_type = 0 AND group_name=?", this.BatchFlag)
  113. if rData != nil || len(*rData) == 0 {
  114. log.Printf("[MANAGER-INFO]AheadManager getUserGroup from Db Total is %d \n", len(*rData))
  115. userIds = make([]string, 0, len(*rData))
  116. for _, m := range *rData {
  117. if uId, _ := m["uid"].(string); uId != "" {
  118. userIds = append(userIds, uId)
  119. }
  120. }
  121. //是否有新增测试用户
  122. if len(userIds) > 0 && len(vars.Config.TestUid) > 0 {
  123. var newTest []string
  124. for _, uid := range vars.Config.TestUid {
  125. has := false
  126. for _, id := range userIds {
  127. if uid == id {
  128. has = true
  129. break
  130. }
  131. }
  132. if !has && !power.HasAheadPower(uid) {
  133. newTest = append(newTest, uid)
  134. }
  135. }
  136. if len(newTest) > 0 {
  137. userIds = append(userIds, newTest...)
  138. activeUsers.SaveBatchGroup(newTest, 0, this.BatchFlag, "testUser")
  139. }
  140. }
  141. }
  142. userIds = []string{}
  143. //无数据则重新生成用户群组数据
  144. if len(userIds) == 0 {
  145. log.Printf("[MANAGER-INFO]AheadManager getUserGroup createNewGroup start\n")
  146. var (
  147. wg sync.WaitGroup
  148. newActiveGroup, lastActiveGroup, testUserGroup []string
  149. )
  150. wg.Add(3)
  151. //新活跃用户
  152. go func() {
  153. defer wg.Done()
  154. if this.Conf.Mode == ModeMonth { // 月活
  155. newActiveGroup = this.sortUserByBatchAndGetFinal(AheadActiveFreeUser.GetMonthActiveFreeUsers())
  156. } else { // 周活
  157. newActiveGroup = this.sortUserByBatchAndGetFinal(AheadActiveFreeUser.GetWeekActiveFreeUsers())
  158. }
  159. activeUsers.SaveBatchGroup(newActiveGroup, 0, this.BatchFlag, "newActive")
  160. }()
  161. //测试用户群组
  162. go func() {
  163. defer wg.Done()
  164. if len(vars.Config.TestUid) > 0 {
  165. for _, uid := range vars.Config.TestUid {
  166. if !power.HasAheadPower(uid) {
  167. testUserGroup = append(testUserGroup, uid)
  168. }
  169. }
  170. }
  171. activeUsers.SaveBatchGroup(testUserGroup, 0, this.BatchFlag, "testUser")
  172. }()
  173. //查询上批次活跃用户(点击量满足用户)
  174. go func() {
  175. defer wg.Done()
  176. this.RLock()
  177. defer this.RUnlock()
  178. for uId, clickNum := range this.UserGroup {
  179. if clickNum > this.Conf.SaveClickTimes {
  180. lastActiveGroup = append(lastActiveGroup, uId)
  181. }
  182. }
  183. activeUsers.SaveBatchGroup(lastActiveGroup, 0, this.BatchFlag, "oldActive")
  184. }()
  185. wg.Wait()
  186. userIds = make([]string, 0, len(newActiveGroup)+len(lastActiveGroup)+len(testUserGroup))
  187. userIds = append(userIds, newActiveGroup...)
  188. userIds = append(userIds, lastActiveGroup...)
  189. userIds = append(userIds, testUserGroup...)
  190. log.Printf("[MANAGER-INFO]AheadManager getUserGroup createNewGroup Total %d \n New:%d Old:%d Test:%d \n",
  191. len(userIds), len(newActiveGroup), len(lastActiveGroup), len(testUserGroup))
  192. }
  193. return
  194. }
  195. // sortUserByBatchAndGetFinal 根据百分比取用户,优先级【没有参与>参与时间早的>参与时间晚的】
  196. func (this *AheadManager) sortUserByBatchAndGetFinal(userIds []string) (rData []string) {
  197. log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Begin\n")
  198. total := int(this.Conf.Prop * float64(len(userIds)))
  199. //查询历史分组情况
  200. gData := public.UserAnalyseDb.SelectBySql("SELECT * FROM ( SELECT user_mongoid AS uid, MAX( UNIX_TIMESTAMP(create_time) ) AS gTime FROM user_leadGeneration_group WHERE group_type = 0 GROUP BY user_mongoid ) AS groupData ORDER BY gTime ASC")
  201. log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Search Finished\n")
  202. // 无记录直接返回数组前final_count个用户id
  203. if gData == nil || len(*gData) == 0 {
  204. return userIds[:total]
  205. }
  206. sortMap := map[string]int64{}
  207. for _, m := range *gData {
  208. if uId, _ := m["uid"].(string); uId != "" {
  209. num, _ := m["gTime"].(int64)
  210. sortMap[uId] = num
  211. }
  212. }
  213. log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Sort Begin\n")
  214. // 有记录进行排序筛选
  215. sort.Slice(userIds, func(i, j int) bool {
  216. iNum, _ := sortMap[userIds[i]]
  217. jNum, _ := sortMap[userIds[j]]
  218. if iNum < jNum {
  219. return true
  220. }
  221. return false
  222. })
  223. log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Finished\n")
  224. return userIds[:total]
  225. }