aheadManager.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package mananger
  2. import (
  3. "app.yhyue.com/moapp/jybase/redis"
  4. "fmt"
  5. "github.com/robfig/cron/v3"
  6. "leadGeneration/entity/activeUsers"
  7. "leadGeneration/entity/power"
  8. "leadGeneration/entity/search"
  9. "leadGeneration/public"
  10. "leadGeneration/vars"
  11. "log"
  12. "sort"
  13. "sync"
  14. "time"
  15. )
  16. // AheadManager 超前项目管理
  17. type AheadManager struct {
  18. Conf vars.AheadConfig
  19. UserGroup map[string]int
  20. BatchFlag string
  21. sync.RWMutex
  22. }
  23. const (
  24. //缓存
  25. AheadCacheDb = "newother"
  26. AheadRequestFrequencyCacheKey = "leadGeneration_AheadRequest_%s_%d_%s"
  27. AheadRequestTimesLong = 60 * 60 * 24 //缓存一天
  28. )
  29. // InitAheadManager 初始化
  30. func InitAheadManager(conf vars.AheadConfig) *AheadManager {
  31. manager := &AheadManager{
  32. Conf: conf,
  33. UserGroup: make(map[string]int),
  34. }
  35. go manager.ScheduledTasks()
  36. return manager
  37. }
  38. // GetData 获取查询数据
  39. func (this *AheadManager) GetData(userId, keyWords string, isNew bool, t int64) map[string]interface{} {
  40. if !(isNew || this.checkGroupUser(userId)) {
  41. return nil
  42. }
  43. //校验每日请求次数
  44. cacheKey := fmt.Sprintf(AheadRequestFrequencyCacheKey, time.Now().Format(public.TimeFormat), t, userId)
  45. if this.Conf.DailyTimes <= redis.GetInt(AheadCacheDb, cacheKey) {
  46. return nil
  47. }
  48. //查询数据
  49. rDate := search.AdvancedProject(userId, keyWords)
  50. //累计请求计数
  51. if rDate != nil && len(rDate) > 0 {
  52. if num := redis.Incr(AheadCacheDb, cacheKey); num == 1 {
  53. _ = redis.SetExpire(AheadCacheDb, cacheKey, AheadRequestTimesLong)
  54. }
  55. }
  56. return rDate
  57. }
  58. // Click 用户点击
  59. func (this *AheadManager) Click(userId string) bool {
  60. this.Lock()
  61. defer this.Unlock()
  62. if v, ok := this.UserGroup[userId]; ok {
  63. this.UserGroup[userId] = v
  64. return true
  65. }
  66. return false
  67. }
  68. // CheckGroupUser 校验用户是否有资格展示
  69. func (this *AheadManager) checkGroupUser(userId string) (exists bool) {
  70. this.RLock()
  71. defer this.RUnlock()
  72. _, exists = this.UserGroup[userId]
  73. return
  74. }
  75. // ScheduledTasks 定时任务
  76. func (this *AheadManager) ScheduledTasks() {
  77. if this.Conf.UpdateCron != "" {
  78. c := cron.New(cron.WithSeconds())
  79. // 给对象增加定时任务
  80. if _, err := c.AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil {
  81. panic(err)
  82. }
  83. c.Start()
  84. }
  85. //首次运行圈选用户
  86. go this.UpdateUserGroupJob()
  87. }
  88. // UpdateUserGroupJob 更新用户群组
  89. func (this *AheadManager) UpdateUserGroupJob() {
  90. if this.Conf.Prop <= 0 {
  91. return
  92. }
  93. log.Printf("[MANAGER-INFO]AheadManager UserGroup Change Start\n")
  94. this.BatchFlag = public.GetWeekBatchName(time.Now())
  95. newMap := map[string]int{}
  96. //新圈用户
  97. for _, uId := range this.getUserGroup() {
  98. newMap[uId] = 0
  99. }
  100. this.Lock()
  101. defer this.Unlock()
  102. this.UserGroup = newMap
  103. log.Printf("[MANAGER-INFO]AheadManager UserGroup Changed Finish Total is %d \n", len(this.UserGroup))
  104. }
  105. // getUserGroup 取用户
  106. func (this *AheadManager) getUserGroup() (userIds []string) {
  107. //当前批次是否已有数据
  108. rData := public.UserAnalyseDb.SelectBySql("SELECT user_mongoid AS uid FROM user_leadGeneration_group WHERE group_type = 0 AND group_name=?", this.BatchFlag)
  109. if rData != nil || len(*rData) == 0 {
  110. log.Printf("[MANAGER-INFO]AheadManager getUserGroup from Db Total is %d \n", len(*rData))
  111. userIds = make([]string, 0, len(*rData))
  112. for _, m := range *rData {
  113. if uId, _ := m["uid"].(string); uId != "" {
  114. userIds = append(userIds, uId)
  115. }
  116. }
  117. //是否有新增测试用户
  118. if len(userIds) > 0 && len(vars.Config.TestUid) > 0 {
  119. var newTest []string
  120. for _, uid := range vars.Config.TestUid {
  121. has := false
  122. for _, id := range userIds {
  123. if uid == id {
  124. has = true
  125. break
  126. }
  127. }
  128. if !has && !power.HasAheadPower(uid) {
  129. newTest = append(newTest, uid)
  130. }
  131. }
  132. if len(newTest) > 0 {
  133. userIds = append(userIds, newTest...)
  134. activeUsers.SaveBatchGroup(newTest, 0, this.BatchFlag, "testUser")
  135. }
  136. }
  137. }
  138. //无数据则重新生成用户群组数据
  139. if len(userIds) == 0 {
  140. log.Printf("[MANAGER-INFO]AheadManager getUserGroup createNewGroup start\n")
  141. var (
  142. wg sync.WaitGroup
  143. newActiveGroup, lastActiveGroup, testUserGroup []string
  144. )
  145. wg.Add(3)
  146. //新活跃用户
  147. go func() {
  148. defer wg.Done()
  149. newActiveGroup = this.sortUserByBatchAndGetFinal(activeUsers.GetWeekActiveFreeUsers())
  150. activeUsers.SaveBatchGroup(newActiveGroup, 0, this.BatchFlag, "newActive")
  151. }()
  152. //测试用户群组
  153. go func() {
  154. defer wg.Done()
  155. if len(vars.Config.TestUid) > 0 {
  156. for _, uid := range vars.Config.TestUid {
  157. if !power.HasAheadPower(uid) {
  158. testUserGroup = append(testUserGroup, uid)
  159. }
  160. }
  161. }
  162. activeUsers.SaveBatchGroup(testUserGroup, 0, this.BatchFlag, "testUser")
  163. }()
  164. //查询上批次活跃用户(点击量满足用户)
  165. go func() {
  166. defer wg.Done()
  167. this.RLock()
  168. defer this.RUnlock()
  169. for uId, clickNum := range this.UserGroup {
  170. if clickNum > this.Conf.SaveClickTimes {
  171. lastActiveGroup = append(lastActiveGroup, uId)
  172. }
  173. }
  174. activeUsers.SaveBatchGroup(lastActiveGroup, 0, this.BatchFlag, "oldActive")
  175. }()
  176. wg.Wait()
  177. userIds = make([]string, 0, len(newActiveGroup)+len(lastActiveGroup)+len(testUserGroup))
  178. userIds = append(userIds, newActiveGroup...)
  179. userIds = append(userIds, lastActiveGroup...)
  180. userIds = append(userIds, testUserGroup...)
  181. log.Printf("[MANAGER-INFO]AheadManager getUserGroup createNewGroup Total %d \n New:%d Old:%d Test:%d \n",
  182. len(userIds), len(newActiveGroup), len(lastActiveGroup), len(testUserGroup))
  183. }
  184. return
  185. }
  186. // sortUserByBatchAndGetFinal 根据百分比取用户,优先级【没有参与>参与时间早的>参与时间晚的】
  187. func (this *AheadManager) sortUserByBatchAndGetFinal(userIds []string) (rData []string) {
  188. log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Begin\n")
  189. total := int(this.Conf.Prop * float64(len(userIds)))
  190. //查询历史分组情况
  191. gData := public.UserAnalyseDb.SelectBySql("SELECT * FROM ( SELECT user_mongoid AS uid, MAX( UNIX_TIMESTAMP(create_time) ) AS gTime FROM user_leadGeneration_group WHERE group_type = 0 GROUP BY user_mongoid ) AS groupData ORDER BY gTime ASC")
  192. log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Search Finished\n")
  193. // 无记录直接返回数组前final_count个用户id
  194. if gData == nil || len(*gData) == 0 {
  195. return userIds[:total]
  196. }
  197. sortMap := map[string]int64{}
  198. for _, m := range *gData {
  199. if uId, _ := m["uid"].(string); uId != "" {
  200. num, _ := m["gTime"].(int64)
  201. sortMap[uId] = num
  202. }
  203. }
  204. log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Sort Begin\n")
  205. // 有记录进行排序筛选
  206. sort.Slice(userIds, func(i, j int) bool {
  207. iNum, _ := sortMap[userIds[i]]
  208. jNum, _ := sortMap[userIds[j]]
  209. if iNum < jNum {
  210. return true
  211. }
  212. return false
  213. })
  214. log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Finished\n")
  215. return userIds[:total]
  216. }