customManager.go 8.8 KB


  1. package mananger
  2. import (
  3. "app.yhyue.com/moapp/jybase/redis"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/robfig/cron/v3"
  7. "leadGeneration/entity/activeUsers"
  8. "leadGeneration/entity/power"
  9. "leadGeneration/entity/search"
  10. "leadGeneration/public"
  11. "leadGeneration/vars"
  12. "log"
  13. "sync"
  14. "time"
  15. )
  16. const (
  17. customCacheDb = "newother"
  18. customDataCacheKey = "leadGeneration_customData_%s"
  19. customDataCacheTimeLong = 60 * 60 * 24 * 30
  20. customNewUserQueueKey = "leadGeneration_customData_newUserQueue"
  21. )
  22. // CustomManager 定制化报告管理
  23. type CustomManager struct {
  24. Conf vars.CustomConfig
  25. UserGroup map[string]int // 月活用户
  26. BatchFlag string // 批次标识
  27. StopSearch chan bool // 停止查询信号
  28. IsDoSearch bool // 是否正在查询
  29. sync.RWMutex
  30. }
  31. type SearchEntity struct {
  32. UserId string
  33. Value string
  34. }
  35. var (
  36. newRegisterUserQueue = make(chan *SearchEntity) //新用户队列
  37. activityUserQueue = make(chan *SearchEntity) //活跃用户队列
  38. )
  39. // InitCustomManager 初始化
  40. func InitCustomManager(conf vars.CustomConfig) *CustomManager {
  41. manager := &CustomManager{
  42. Conf: conf,
  43. UserGroup: make(map[string]int),
  44. StopSearch: make(chan bool),
  45. }
  46. //定时任务
  47. go manager.ScheduledTasks()
  48. return manager
  49. }
  50. // GetData 获取查询数据
  51. func (this *CustomManager) GetData(userId, keyWords string, isNew bool) map[string]interface{} {
  52. if !(isNew || this.checkActivityUser(userId)) {
  53. return nil
  54. }
  55. //查询是否是有定制化分析报告权限
  56. if power.HasPower(userId) {
  57. return nil
  58. }
  59. //查询数据
  60. if bytes, err := redis.GetBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, userId)); err == nil && bytes != nil {
  61. rData := map[string]interface{}{}
  62. if err := json.Unmarshal(*bytes, &rData); err != nil {
  63. log.Printf("[MANAGER-ERR]CustomManager %s GetData Error %v \n", userId, err)
  64. return nil
  65. }
  66. return rData
  67. } else if isNew {
  68. //加入优先查询队列
  69. redis.RPUSH(customCacheDb, customNewUserQueueKey, userId)
  70. }
  71. return nil
  72. }
  73. // ScheduledTasks 定时任务
  74. func (this *CustomManager) ScheduledTasks() {
  75. // 定时圈用户
  76. if this.Conf.UpdateCron != "" {
  77. c := cron.New(cron.WithSeconds())
  78. // 给对象增加定时任务
  79. if _, err := c.AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil {
  80. panic(err)
  81. }
  82. c.Start()
  83. }
  84. // 查询时间段限时
  85. if this.Conf.SearchLimit.Switch.Start != "" && this.Conf.SearchLimit.Switch.Stop != "" {
  86. //开始
  87. startJob := cron.New(cron.WithSeconds())
  88. if _, err := startJob.AddFunc(this.Conf.SearchLimit.Switch.Start, func() {
  89. if !this.IsDoSearch {
  90. go this.DoSearch()
  91. }
  92. }); err != nil {
  93. panic(err)
  94. }
  95. startJob.Start()
  96. //结束
  97. endJob := cron.New(cron.WithSeconds())
  98. if _, err := endJob.AddFunc(this.Conf.SearchLimit.Switch.Stop, func() {
  99. if this.IsDoSearch {
  100. this.StopSearch <- true
  101. }
  102. }); err != nil {
  103. panic(err)
  104. }
  105. endJob.Start()
  106. }
  107. // 首次运行启动查询
  108. go this.DoSearch()
  109. // 首次运行圈选用户
  110. this.UpdateUserGroupJob()
  111. }
  112. // UpdateUserGroupJob 更新用户群组
  113. func (this *CustomManager) UpdateUserGroupJob() {
  114. if !this.Conf.Open {
  115. return
  116. }
  117. log.Printf("[MANAGER-INFO]CustomManager UserGroup Change Start\n")
  118. //更新批次标识
  119. this.BatchFlag = public.GetWeekBatchName(time.Now())
  120. newMap := map[string]int{}
  121. userArr := this.getUserGroup()
  122. for _, uId := range userArr {
  123. newMap[uId] = 0
  124. }
  125. //更新新用户群组
  126. this.Lock()
  127. defer this.Unlock()
  128. this.UserGroup = newMap
  129. log.Printf("[MANAGER-INFO]CustomManager UserGroup Changed Finish Total is %d \n", len(this.UserGroup))
  130. go this.activityUserQueue(this.BatchFlag, userArr)
  131. go this.newUserQueue(this.BatchFlag)
  132. }
  133. // getUserGroup 获取用户群组
  134. func (this *CustomManager) getUserGroup() (userIds []string) {
  135. //当前批次是否已有数据
  136. rData := public.UserAnalyseDb.SelectBySql("SELECT user_mongoid AS uid FROM user_leadGeneration_group WHERE group_type = 1 AND group_name=?", this.BatchFlag)
  137. if rData != nil || len(*rData) == 0 {
  138. log.Printf("[MANAGER-INFO]CustomManager getUserGroup from Db Total is %d \n", len(*rData))
  139. userIds = make([]string, 0, len(*rData))
  140. for _, m := range *rData {
  141. if uId, _ := m["uid"].(string); uId != "" {
  142. userIds = append(userIds, uId)
  143. }
  144. }
  145. //是否有新增测试用户
  146. if len(userIds) > 0 && len(vars.Config.TestUid) > 0 {
  147. var newTest []string
  148. for _, uid := range vars.Config.TestUid {
  149. has := false
  150. for _, id := range userIds {
  151. if uid == id {
  152. has = true
  153. break
  154. }
  155. }
  156. if !has && !power.HasPower(uid) {
  157. newTest = append(newTest, uid)
  158. }
  159. }
  160. if len(newTest) > 0 {
  161. userIds = append(userIds, newTest...)
  162. activeUsers.SaveBatchGroup(newTest, 1, this.BatchFlag, "testUser")
  163. }
  164. }
  165. }
  166. //无数据则重新生成用户群组数据
  167. if len(userIds) == 0 {
  168. log.Printf("[MANAGER-INFO]CustomManager getUserGroup createNewGroup start\n")
  169. var (
  170. wg sync.WaitGroup
  171. newActiveGroup, testUserGroup []string
  172. )
  173. wg.Add(2)
  174. //测试用户群组
  175. go func() {
  176. defer wg.Done()
  177. if len(vars.Config.TestUid) > 0 {
  178. for _, uid := range vars.Config.TestUid {
  179. if !power.HasPower(uid) {
  180. testUserGroup = append(testUserGroup, uid)
  181. }
  182. }
  183. }
  184. activeUsers.SaveBatchGroup(testUserGroup, 1, this.BatchFlag, "testUser")
  185. }()
  186. //新活跃用户
  187. go func() {
  188. defer wg.Done()
  189. if this.Conf.Mode == ModeMonth { // 月活
  190. newActiveGroup = CustomerActiveFreeUser.GetMonthActiveFreeUsers()
  191. } else { // 周活
  192. newActiveGroup = CustomerActiveFreeUser.GetWeekActiveFreeUsers()
  193. }
  194. activeUsers.SaveBatchGroup(newActiveGroup, 1, this.BatchFlag, "newActive")
  195. }()
  196. wg.Wait()
  197. userIds = make([]string, 0, len(newActiveGroup)+len(testUserGroup))
  198. userIds = append(userIds, newActiveGroup...)
  199. userIds = append(userIds, testUserGroup...)
  200. log.Printf("[MANAGER-INFO]CustomManager getUserGroup createNewGroup end Total %d\n", len(userIds))
  201. }
  202. return
  203. }
  204. // activityUserQueue 活跃用户查询队列
  205. func (this *CustomManager) activityUserQueue(batchFlag string, userIds []string) {
  206. for i, userId := range userIds {
  207. if i%100 == 0 {
  208. log.Printf("[MANAGER-INFO]CustomManager Batch %s Now(%d/%d)\n", batchFlag, i, len(userIds))
  209. }
  210. //当批次更新时,上批次停止
  211. if this.BatchFlag != batchFlag {
  212. log.Printf("[MANAGER-INFO]CustomManager Batch %s Is END At (%d/%d) \n", batchFlag, i, len(userIds))
  213. break
  214. }
  215. activityUserQueue <- &SearchEntity{
  216. UserId: userId,
  217. }
  218. }
  219. log.Printf("[MANAGER-INFO]CustomManager Batch %s Is Finished !!!\n", batchFlag)
  220. }
  221. func (this *CustomManager) newUserQueue(batchFlag string) {
  222. for {
  223. if this.BatchFlag != batchFlag {
  224. unFinishedNum := redis.LLEN(customCacheDb, customNewUserQueueKey)
  225. if unFinishedNum > 0 {
  226. redis.Del(customCacheDb, customNewUserQueueKey)
  227. }
  228. log.Printf("[MANAGER-INFO]CustomManager newUserQueue End unfinished %d \n", unFinishedNum)
  229. return
  230. }
  231. if uid, _ := redis.LPOP(customCacheDb, customNewUserQueueKey).(string); uid != "" {
  232. newRegisterUserQueue <- &SearchEntity{
  233. UserId: uid,
  234. }
  235. }
  236. }
  237. }
  238. // DoSearch 定制化分析报告查询队列
  239. func (this *CustomManager) DoSearch() {
  240. this.IsDoSearch = true
  241. defer func() {
  242. this.IsDoSearch = false
  243. }()
  244. log.Printf("[MANAGER-INFO]CustomManager DoSearch Start\n")
  245. for {
  246. var obj *SearchEntity
  247. select { //优先级 newRegisterUserQueue > activityUserQueue
  248. case <-this.StopSearch: //不在运行时间段内退出查询
  249. log.Printf("[MANAGER-INFO]CustomManager DoSearch End\n")
  250. return
  251. default:
  252. select {
  253. case obj = <-newRegisterUserQueue:
  254. default:
  255. select {
  256. case obj = <-newRegisterUserQueue:
  257. case obj = <-activityUserQueue:
  258. case <-this.StopSearch: //不在运行时间段内退出查询
  259. log.Printf("[MANAGER-INFO]CustomManager DoSearch End\n")
  260. return
  261. }
  262. }
  263. }
  264. //存在缓存直接跳过
  265. key := fmt.Sprintf(customDataCacheKey, obj.UserId)
  266. if has, err := redis.Exists(customCacheDb, key); has && err == nil {
  267. continue
  268. }
  269. //查询结果处理
  270. searchStart := time.Now()
  271. data := search.PotentialCustomizeAnalysis(obj.UserId, obj.Value)
  272. //查询超时,则休息一下
  273. if time.Now().Sub(searchStart).Seconds() > float64(this.Conf.SearchLimit.TimeOver) {
  274. time.Sleep(time.Second * time.Duration(this.Conf.SearchLimit.WaitTime))
  275. }
  276. if data == nil || len(data) == 0 {
  277. log.Printf("[MANAGER-ERR]CustomManager %s DoSearch %s Is Empty\n", obj.UserId, obj.Value)
  278. continue
  279. }
  280. //缓存结果
  281. if bytes, err := json.Marshal(data); err == nil && bytes != nil {
  282. _ = redis.PutBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, obj.UserId), &bytes, customDataCacheTimeLong)
  283. }
  284. }
  285. }
  286. // checkActivityUser 校验用户是否是活跃用户
  287. func (this *CustomManager) checkActivityUser(userId string) (exists bool) {
  288. this.RLock()
  289. defer this.RUnlock()
  290. _, exists = this.UserGroup[userId]
  291. return
  292. }