customManager.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package mananger
  2. import (
  3. "app.yhyue.com/moapp/jybase/redis"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/robfig/cron/v3"
  7. "leadGeneration/entity/activeUsers"
  8. "leadGeneration/entity/power"
  9. "leadGeneration/entity/search"
  10. "leadGeneration/public"
  11. "leadGeneration/vars"
  12. "log"
  13. "sync"
  14. "time"
  15. )
  16. const (
  17. customCacheDb = "newother"
  18. customDataCacheKey = "leadGeneration_customData_%s"
  19. customDataCacheTimeLong = 60 * 60 * 24 * 30
  20. )
  21. // CustomManager 定制化报告管理
  22. type CustomManager struct {
  23. Conf vars.CustomConfig
  24. UserGroup map[string]int // 月活用户
  25. BatchFlag string // 批次标识
  26. sync.RWMutex
  27. }
  28. type SearchEntity struct {
  29. UserId string
  30. Value string
  31. }
  32. var (
  33. newRegisterUserQueue = make(chan *SearchEntity) //新用户队列
  34. activityUserQueue = make(chan *SearchEntity) //活跃用户队列
  35. )
  36. // InitCustomManager 初始化
  37. func InitCustomManager(conf vars.CustomConfig) *CustomManager {
  38. manager := &CustomManager{
  39. Conf: conf,
  40. UserGroup: make(map[string]int),
  41. }
  42. //圈用户
  43. go manager.ScheduledTasks()
  44. //执行查询
  45. go manager.DoSearch()
  46. return manager
  47. }
  48. // GetData 获取查询数据
  49. func (this *CustomManager) GetData(userId, keyWords string, isNew bool) map[string]interface{} {
  50. if !(isNew || this.checkActivityUser(userId)) {
  51. return nil
  52. }
  53. //查询是否是付费用户
  54. if power.HasPower(userId) {
  55. return nil
  56. }
  57. //查询数据
  58. if bytes, err := redis.GetBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, userId)); err == nil && bytes != nil {
  59. rData := map[string]interface{}{}
  60. if err := json.Unmarshal(*bytes, &rData); err != nil {
  61. log.Printf("[ERROR]CustomManager %s GetData Error %v \n", userId, err)
  62. return nil
  63. }
  64. return rData
  65. } else if isNew {
  66. //加入优先查询队列
  67. go func() {
  68. newRegisterUserQueue <- &SearchEntity{
  69. UserId: userId,
  70. Value: keyWords,
  71. }
  72. }()
  73. }
  74. return nil
  75. }
  76. // ScheduledTasks 定时任务
  77. func (this *CustomManager) ScheduledTasks() {
  78. if this.Conf.UpdateCron != "" {
  79. // 给对象增加定时任务
  80. if _, err := cron.New().AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil {
  81. panic(err)
  82. }
  83. }
  84. //首次运行圈选用户
  85. this.UpdateUserGroupJob()
  86. }
  87. // UpdateUserGroupJob 更新用户群组
  88. func (this *CustomManager) UpdateUserGroupJob() {
  89. //查询月活用户
  90. userArr := activeUsers.GetMonthActiveFreeUsers()
  91. newMap := map[string]int{}
  92. //测试账户
  93. if len(vars.Config.TestUid) > 0 {
  94. for _, uid := range vars.Config.TestUid {
  95. newMap[uid] = 0
  96. }
  97. }
  98. for _, uId := range userArr {
  99. newMap[uId] = 0
  100. }
  101. //更新新用户群组
  102. this.Lock()
  103. defer this.Unlock()
  104. this.UserGroup = newMap
  105. //更新批次标识
  106. batchFlag := time.Now().Format(public.Date_Full_Layout)
  107. this.BatchFlag = batchFlag
  108. log.Printf("CustomManager NewGroup %v\n", newMap)
  109. go this.activityUserQueue(batchFlag, userArr)
  110. }
  111. // activityUserQueue 活跃用户查询队列
  112. func (this *CustomManager) activityUserQueue(batchFlag string, userIds []string) {
  113. for i, userId := range userIds {
  114. if i%100 == 0 {
  115. log.Printf("[INFO]CustomManager Batch %s Now(%d/%d)\n", batchFlag, len(userIds), i)
  116. }
  117. //当批次更新时,上批次停止
  118. if this.BatchFlag != batchFlag {
  119. log.Printf("[INFO]CustomManager Batch %s Is END At (%d/%d) \n", batchFlag, i, len(userIds))
  120. break
  121. }
  122. activityUserQueue <- &SearchEntity{
  123. UserId: userId,
  124. }
  125. }
  126. log.Printf("[INFO]CustomManager Batch %s Is Finished !!!\n", batchFlag)
  127. }
  128. // DoSearch 定制化分析报告查询队列
  129. func (this *CustomManager) DoSearch() {
  130. for {
  131. //是否在可执行时间段内
  132. //???
  133. var obj *SearchEntity
  134. select { //优先级 newRegisterUserQueue > activityUserQueue
  135. case obj = <-newRegisterUserQueue:
  136. default:
  137. select {
  138. case obj = <-newRegisterUserQueue:
  139. case obj = <-activityUserQueue:
  140. }
  141. }
  142. //查询结果处理
  143. data := search.PotentialCustomizeAnalysis(obj.UserId, obj.Value)
  144. if data == nil || len(data) == 0 {
  145. log.Printf("[ERROR]CustomManager %s DoSearch %s Error %v\n", obj.UserId, obj.Value)
  146. continue
  147. }
  148. //缓存结果
  149. if bytes, err := json.Marshal(data); err == nil && bytes != nil {
  150. _ = redis.PutBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, obj.UserId), &bytes, customDataCacheTimeLong)
  151. }
  152. }
  153. }
  154. // checkActivityUser 校验用户是否是活跃用户
  155. func (this *CustomManager) checkActivityUser(userId string) (exists bool) {
  156. if len(vars.Config.TestUid) > 0 {
  157. for _, uid := range vars.Config.TestUid {
  158. if uid == userId {
  159. return true
  160. }
  161. }
  162. }
  163. this.RLock()
  164. defer this.RUnlock()
  165. _, exists = this.UserGroup[userId]
  166. return
  167. }