customManager.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package mananger
  2. import (
  3. "app.yhyue.com/moapp/jybase/redis"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/robfig/cron/v3"
  7. "leadGeneration/entity/activeUsers"
  8. "leadGeneration/entity/power"
  9. "leadGeneration/entity/search"
  10. "leadGeneration/public"
  11. "leadGeneration/vars"
  12. "log"
  13. "sync"
  14. "time"
  15. )
  16. const (
  17. customCacheDb = "newother"
  18. customDataCacheKey = "leadGeneration_customData_%s"
  19. customDataCacheTimeLong = 60 * 60 * 24 * 30
  20. customNewUserQueueKey = "leadGeneration_customData_newUserQueue"
  21. )
  22. // CustomManager 定制化报告管理
  23. type CustomManager struct {
  24. Conf vars.CustomConfig
  25. UserGroup map[string]int // 月活用户
  26. BatchFlag string // 批次标识
  27. StopSearch chan bool // 停止查询信号
  28. sync.RWMutex
  29. }
  30. type SearchEntity struct {
  31. UserId string
  32. Value string
  33. }
  34. var (
  35. newRegisterUserQueue = make(chan *SearchEntity) //新用户队列
  36. activityUserQueue = make(chan *SearchEntity) //活跃用户队列
  37. )
  38. // InitCustomManager 初始化
  39. func InitCustomManager(conf vars.CustomConfig) *CustomManager {
  40. manager := &CustomManager{
  41. Conf: conf,
  42. UserGroup: make(map[string]int),
  43. StopSearch: make(chan bool),
  44. }
  45. //定时任务
  46. go manager.ScheduledTasks()
  47. return manager
  48. }
  49. // GetData 获取查询数据
  50. func (this *CustomManager) GetData(userId, keyWords string, isNew bool) map[string]interface{} {
  51. if !(isNew || this.checkActivityUser(userId)) {
  52. return nil
  53. }
  54. //查询是否是付费用户
  55. if power.HasPower(userId) {
  56. return nil
  57. }
  58. //查询数据
  59. if bytes, err := redis.GetBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, userId)); err == nil && bytes != nil {
  60. rData := map[string]interface{}{}
  61. if err := json.Unmarshal(*bytes, &rData); err != nil {
  62. log.Printf("[MANAGER-ERR]CustomManager %s GetData Error %v \n", userId, err)
  63. return nil
  64. }
  65. return rData
  66. } else if isNew {
  67. //加入优先查询队列
  68. redis.RPUSH(customCacheDb, customNewUserQueueKey, userId)
  69. }
  70. return nil
  71. }
  72. // ScheduledTasks 定时任务
  73. func (this *CustomManager) ScheduledTasks() {
  74. // 定时圈用户
  75. if this.Conf.UpdateCron != "" {
  76. c := cron.New(cron.WithSeconds())
  77. // 给对象增加定时任务
  78. if _, err := c.AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil {
  79. panic(err)
  80. }
  81. c.Start()
  82. }
  83. // 查询时间段限时
  84. if this.Conf.SearchLimit.Switch.Start != "" && this.Conf.SearchLimit.Switch.Stop != "" {
  85. //开始
  86. startJob := cron.New(cron.WithSeconds())
  87. if _, err := startJob.AddFunc(this.Conf.SearchLimit.Switch.Start, func() {
  88. go this.DoSearch()
  89. }); err != nil {
  90. panic(err)
  91. }
  92. startJob.Start()
  93. //结束
  94. endJob := cron.New(cron.WithSeconds())
  95. if _, err := endJob.AddFunc(this.Conf.SearchLimit.Switch.Stop, func() {
  96. this.StopSearch <- true
  97. }); err != nil {
  98. panic(err)
  99. }
  100. endJob.Start()
  101. }
  102. // 首次运行启动查询
  103. go this.DoSearch()
  104. // 首次运行圈选用户
  105. this.UpdateUserGroupJob()
  106. }
  107. // UpdateUserGroupJob 更新用户群组
  108. func (this *CustomManager) UpdateUserGroupJob() {
  109. if !this.Conf.Open {
  110. return
  111. }
  112. log.Printf("[MANAGER-INFO]CustomManager UserGroup Change Start\n")
  113. //更新批次标识
  114. this.BatchFlag = public.GetWeekBatchName(time.Now())
  115. newMap := map[string]int{}
  116. userArr := this.getUserGroup()
  117. for _, uId := range userArr {
  118. newMap[uId] = 0
  119. }
  120. //更新新用户群组
  121. this.Lock()
  122. defer this.Unlock()
  123. this.UserGroup = newMap
  124. log.Printf("[MANAGER-INFO]CustomManager UserGroup Changed Finish Total is %d \n", len(this.UserGroup))
  125. go this.activityUserQueue(this.BatchFlag, userArr)
  126. go this.newUserQueue(this.BatchFlag)
  127. }
  128. //getUserGroup 获取用户群组
  129. func (this *CustomManager) getUserGroup() (userIds []string) {
  130. //当前批次是否已有数据
  131. rData := public.UserAnalyseDb.SelectBySql("SELECT user_mongoid AS uid FROM user_leadGeneration_group WHERE group_type = 1 AND group_name=?", this.BatchFlag)
  132. if rData != nil || len(*rData) == 0 {
  133. log.Printf("[MANAGER-INFO]CustomManager getUserGroup from Db Total is %d \n", len(*rData))
  134. userIds = make([]string, 0, len(*rData))
  135. for _, m := range *rData {
  136. if uId, _ := m["uid"].(string); uId != "" {
  137. userIds = append(userIds, uId)
  138. }
  139. }
  140. //是否有新增测试用户
  141. if len(userIds) > 0 && len(vars.Config.TestUid) > 0 {
  142. var newTest []string
  143. for _, uid := range vars.Config.TestUid {
  144. has := false
  145. for _, id := range userIds {
  146. if uid == id {
  147. has = true
  148. break
  149. }
  150. }
  151. if !has && !power.HasPower(uid) {
  152. newTest = append(newTest, uid)
  153. }
  154. }
  155. if len(newTest) > 0 {
  156. userIds = append(userIds, newTest...)
  157. activeUsers.SaveBatchGroup(newTest, 1, this.BatchFlag, "testUser")
  158. }
  159. }
  160. }
  161. //无数据则重新生成用户群组数据
  162. if len(userIds) == 0 {
  163. log.Printf("[MANAGER-INFO]CustomManager getUserGroup createNewGroup start\n")
  164. var (
  165. wg sync.WaitGroup
  166. newActiveGroup, testUserGroup []string
  167. )
  168. wg.Add(2)
  169. //测试用户群组
  170. go func() {
  171. defer wg.Done()
  172. if len(vars.Config.TestUid) > 0 {
  173. for _, uid := range vars.Config.TestUid {
  174. if !power.HasPower(uid) {
  175. testUserGroup = append(testUserGroup, uid)
  176. }
  177. }
  178. }
  179. activeUsers.SaveBatchGroup(testUserGroup, 1, this.BatchFlag, "testUser")
  180. }()
  181. //新活跃用户
  182. go func() {
  183. defer wg.Done()
  184. newActiveGroup = activeUsers.GetMonthActiveFreeUsers()
  185. activeUsers.SaveBatchGroup(newActiveGroup, 1, this.BatchFlag, "newActive")
  186. }()
  187. wg.Wait()
  188. userIds = make([]string, 0, len(newActiveGroup)+len(testUserGroup))
  189. userIds = append(userIds, newActiveGroup...)
  190. userIds = append(userIds, testUserGroup...)
  191. log.Printf("[MANAGER-INFO]CustomManager getUserGroup createNewGroup end Total %d\n", len(userIds))
  192. }
  193. return
  194. }
  195. // activityUserQueue 活跃用户查询队列
  196. func (this *CustomManager) activityUserQueue(batchFlag string, userIds []string) {
  197. for i, userId := range userIds {
  198. if i%100 == 0 {
  199. log.Printf("[MANAGER-INFO]CustomManager Batch %s Now(%d/%d)\n", batchFlag, i, len(userIds))
  200. }
  201. //当批次更新时,上批次停止
  202. if this.BatchFlag != batchFlag {
  203. log.Printf("[MANAGER-INFO]CustomManager Batch %s Is END At (%d/%d) \n", batchFlag, i, len(userIds))
  204. break
  205. }
  206. activityUserQueue <- &SearchEntity{
  207. UserId: userId,
  208. }
  209. }
  210. log.Printf("[MANAGER-INFO]CustomManager Batch %s Is Finished !!!\n", batchFlag)
  211. }
  212. func (this *CustomManager) newUserQueue(batchFlag string) {
  213. for {
  214. if this.BatchFlag != batchFlag {
  215. unFinishedNum := redis.LLEN(customCacheDb, customNewUserQueueKey)
  216. if unFinishedNum > 0 {
  217. redis.Del(customCacheDb, customNewUserQueueKey)
  218. }
  219. log.Printf("[MANAGER-INFO]CustomManager newUserQueue End unfinished %d \n", unFinishedNum)
  220. return
  221. }
  222. if uid, _ := redis.LPOP(customCacheDb, customNewUserQueueKey).(string); uid != "" {
  223. newRegisterUserQueue <- &SearchEntity{
  224. UserId: uid,
  225. }
  226. }
  227. }
  228. }
  229. // DoSearch 定制化分析报告查询队列
  230. func (this *CustomManager) DoSearch() {
  231. log.Printf("[MANAGER-INFO]CustomManager DoSearch Start\n")
  232. for {
  233. var obj *SearchEntity
  234. select { //优先级 newRegisterUserQueue > activityUserQueue
  235. case <-this.StopSearch: //不在运行时间段内退出查询
  236. log.Printf("[MANAGER-INFO]CustomManager DoSearch End\n")
  237. return
  238. default:
  239. select {
  240. case obj = <-newRegisterUserQueue:
  241. default:
  242. select {
  243. case obj = <-newRegisterUserQueue:
  244. case obj = <-activityUserQueue:
  245. case <-this.StopSearch: //不在运行时间段内退出查询
  246. log.Printf("[MANAGER-INFO]CustomManager DoSearch End\n")
  247. return
  248. }
  249. }
  250. }
  251. //存在缓存直接跳过
  252. key := fmt.Sprintf(customDataCacheKey, obj.UserId)
  253. if has, err := redis.Exists(customCacheDb, key); has && err == nil {
  254. continue
  255. }
  256. //查询结果处理
  257. searchStart := time.Now()
  258. data := search.PotentialCustomizeAnalysis(obj.UserId, obj.Value)
  259. //查询超时,则休息一下
  260. if time.Now().Sub(searchStart).Seconds() > float64(this.Conf.SearchLimit.TimeOver) {
  261. time.Sleep(time.Second * time.Duration(this.Conf.SearchLimit.WaitTime))
  262. }
  263. if data == nil || len(data) == 0 {
  264. log.Printf("[MANAGER-ERR]CustomManager %s DoSearch %s Is Empty\n", obj.UserId, obj.Value)
  265. continue
  266. }
  267. //缓存结果
  268. if bytes, err := json.Marshal(data); err == nil && bytes != nil {
  269. _ = redis.PutBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, obj.UserId), &bytes, customDataCacheTimeLong)
  270. }
  271. }
  272. }
  273. // checkActivityUser 校验用户是否是活跃用户
  274. func (this *CustomManager) checkActivityUser(userId string) (exists bool) {
  275. this.RLock()
  276. defer this.RUnlock()
  277. _, exists = this.UserGroup[userId]
  278. return
  279. }