123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- package mananger
- import (
- "app.yhyue.com/moapp/jybase/redis"
- "encoding/json"
- "fmt"
- "github.com/robfig/cron/v3"
- "leadGeneration/entity/activeUsers"
- "leadGeneration/entity/power"
- "leadGeneration/entity/search"
- "leadGeneration/public"
- "leadGeneration/vars"
- "log"
- "sync"
- "time"
- )
- const (
- customCacheDb = "newother"
- customDataCacheKey = "leadGeneration_customData_%s"
- customDataCacheTimeLong = 60 * 60 * 24 * 30
- customNewUserQueueKey = "leadGeneration_customData_newUserQueue"
- )
- // CustomManager 定制化报告管理
- type CustomManager struct {
- Conf vars.CustomConfig
- UserGroup map[string]int // 月活用户
- BatchFlag string // 批次标识
- StopSearch chan bool // 停止查询信号
- sync.RWMutex
- }
- type SearchEntity struct {
- UserId string
- Value string
- }
- var (
- newRegisterUserQueue = make(chan *SearchEntity) //新用户队列
- activityUserQueue = make(chan *SearchEntity) //活跃用户队列
- )
- // InitCustomManager 初始化
- func InitCustomManager(conf vars.CustomConfig) *CustomManager {
- manager := &CustomManager{
- Conf: conf,
- UserGroup: make(map[string]int),
- StopSearch: make(chan bool),
- }
- //定时任务
- go manager.ScheduledTasks()
- return manager
- }
- // GetData 获取查询数据
- func (this *CustomManager) GetData(userId, keyWords string, isNew bool) map[string]interface{} {
- if !(isNew || this.checkActivityUser(userId)) {
- return nil
- }
- //查询是否是付费用户
- if power.HasPower(userId) {
- return nil
- }
- //查询数据
- if bytes, err := redis.GetBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, userId)); err == nil && bytes != nil {
- rData := map[string]interface{}{}
- if err := json.Unmarshal(*bytes, &rData); err != nil {
- log.Printf("[MANAGER-ERR]CustomManager %s GetData Error %v \n", userId, err)
- return nil
- }
- return rData
- } else if isNew {
- //加入优先查询队列
- redis.RPUSH(customCacheDb, customNewUserQueueKey, userId)
- }
- return nil
- }
- // ScheduledTasks 定时任务
- func (this *CustomManager) ScheduledTasks() {
- // 定时圈用户
- if this.Conf.UpdateCron != "" {
- c := cron.New(cron.WithSeconds())
- // 给对象增加定时任务
- if _, err := c.AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil {
- panic(err)
- }
- c.Start()
- }
- // 查询时间段限时
- if this.Conf.SearchLimit.Switch.Start != "" && this.Conf.SearchLimit.Switch.Stop != "" {
- //开始
- startJob := cron.New(cron.WithSeconds())
- if _, err := startJob.AddFunc(this.Conf.SearchLimit.Switch.Start, func() {
- go this.DoSearch()
- }); err != nil {
- panic(err)
- }
- startJob.Start()
- //结束
- endJob := cron.New(cron.WithSeconds())
- if _, err := endJob.AddFunc(this.Conf.SearchLimit.Switch.Stop, func() {
- this.StopSearch <- true
- }); err != nil {
- panic(err)
- }
- endJob.Start()
- }
- // 首次运行启动查询
- go this.DoSearch()
- // 首次运行圈选用户
- this.UpdateUserGroupJob()
- }
- // UpdateUserGroupJob 更新用户群组
- func (this *CustomManager) UpdateUserGroupJob() {
- if !this.Conf.Open {
- return
- }
- log.Printf("[MANAGER-INFO]CustomManager UserGroup Change Start\n")
- //更新批次标识
- this.BatchFlag = public.GetWeekBatchName(time.Now())
- newMap := map[string]int{}
- userArr := this.getUserGroup()
- for _, uId := range userArr {
- newMap[uId] = 0
- }
- //更新新用户群组
- this.Lock()
- defer this.Unlock()
- this.UserGroup = newMap
- log.Printf("[MANAGER-INFO]CustomManager UserGroup Changed Finish Total is %d \n", len(this.UserGroup))
- go this.activityUserQueue(this.BatchFlag, userArr)
- go this.newUserQueue(this.BatchFlag)
- }
- //getUserGroup 获取用户群组
- func (this *CustomManager) getUserGroup() (userIds []string) {
- //当前批次是否已有数据
- rData := public.UserAnalyseDb.SelectBySql("SELECT user_mongoid AS uid FROM user_leadGeneration_group WHERE group_type = 1 AND group_name=?", this.BatchFlag)
- if rData != nil || len(*rData) == 0 {
- log.Printf("[MANAGER-INFO]CustomManager getUserGroup from Db Total is %d \n", len(*rData))
- userIds = make([]string, 0, len(*rData))
- for _, m := range *rData {
- if uId, _ := m["uid"].(string); uId != "" {
- userIds = append(userIds, uId)
- }
- }
- //是否有新增测试用户
- if len(userIds) > 0 && len(vars.Config.TestUid) > 0 {
- var newTest []string
- for _, uid := range vars.Config.TestUid {
- has := false
- for _, id := range userIds {
- if uid == id {
- has = true
- break
- }
- }
- if !has && !power.HasPower(uid) {
- newTest = append(newTest, uid)
- }
- }
- if len(newTest) > 0 {
- userIds = append(userIds, newTest...)
- activeUsers.SaveBatchGroup(newTest, 1, this.BatchFlag, "testUser")
- }
- }
- }
- //无数据则重新生成用户群组数据
- if len(userIds) == 0 {
- log.Printf("[MANAGER-INFO]CustomManager getUserGroup createNewGroup start\n")
- var (
- wg sync.WaitGroup
- newActiveGroup, testUserGroup []string
- )
- wg.Add(2)
- //测试用户群组
- go func() {
- defer wg.Done()
- if len(vars.Config.TestUid) > 0 {
- for _, uid := range vars.Config.TestUid {
- if !power.HasPower(uid) {
- testUserGroup = append(testUserGroup, uid)
- }
- }
- }
- activeUsers.SaveBatchGroup(testUserGroup, 1, this.BatchFlag, "testUser")
- }()
- //新活跃用户
- go func() {
- defer wg.Done()
- newActiveGroup = activeUsers.GetMonthActiveFreeUsers()
- activeUsers.SaveBatchGroup(newActiveGroup, 1, this.BatchFlag, "newActive")
- }()
- wg.Wait()
- userIds = make([]string, 0, len(newActiveGroup)+len(testUserGroup))
- userIds = append(userIds, newActiveGroup...)
- userIds = append(userIds, testUserGroup...)
- log.Printf("[MANAGER-INFO]CustomManager getUserGroup createNewGroup end Total %d\n", len(userIds))
- }
- return
- }
- // activityUserQueue 活跃用户查询队列
- func (this *CustomManager) activityUserQueue(batchFlag string, userIds []string) {
- for i, userId := range userIds {
- if i%100 == 0 {
- log.Printf("[MANAGER-INFO]CustomManager Batch %s Now(%d/%d)\n", batchFlag, i, len(userIds))
- }
- //当批次更新时,上批次停止
- if this.BatchFlag != batchFlag {
- log.Printf("[MANAGER-INFO]CustomManager Batch %s Is END At (%d/%d) \n", batchFlag, i, len(userIds))
- break
- }
- activityUserQueue <- &SearchEntity{
- UserId: userId,
- }
- }
- log.Printf("[MANAGER-INFO]CustomManager Batch %s Is Finished !!!\n", batchFlag)
- }
- func (this *CustomManager) newUserQueue(batchFlag string) {
- for {
- if this.BatchFlag != batchFlag {
- unFinishedNum := redis.LLEN(customCacheDb, customNewUserQueueKey)
- if unFinishedNum > 0 {
- redis.Del(customCacheDb, customNewUserQueueKey)
- }
- log.Printf("[MANAGER-INFO]CustomManager newUserQueue End unfinished %d \n", unFinishedNum)
- return
- }
- if uid, _ := redis.LPOP(customCacheDb, customNewUserQueueKey).(string); uid != "" {
- newRegisterUserQueue <- &SearchEntity{
- UserId: uid,
- }
- }
- }
- }
- // DoSearch 定制化分析报告查询队列
- func (this *CustomManager) DoSearch() {
- log.Printf("[MANAGER-INFO]CustomManager DoSearch Start\n")
- for {
- var obj *SearchEntity
- select { //优先级 newRegisterUserQueue > activityUserQueue
- case <-this.StopSearch: //不在运行时间段内退出查询
- log.Printf("[MANAGER-INFO]CustomManager DoSearch End\n")
- return
- default:
- select {
- case obj = <-newRegisterUserQueue:
- default:
- select {
- case obj = <-newRegisterUserQueue:
- case obj = <-activityUserQueue:
- case <-this.StopSearch: //不在运行时间段内退出查询
- log.Printf("[MANAGER-INFO]CustomManager DoSearch End\n")
- return
- }
- }
- }
- //存在缓存直接跳过
- key := fmt.Sprintf(customDataCacheKey, obj.UserId)
- if has, err := redis.Exists(customCacheDb, key); has && err == nil {
- continue
- }
- //查询结果处理
- searchStart := time.Now()
- data := search.PotentialCustomizeAnalysis(obj.UserId, obj.Value)
- //查询超时,则休息一下
- if time.Now().Sub(searchStart).Seconds() > float64(this.Conf.SearchLimit.TimeOver) {
- time.Sleep(time.Second * time.Duration(this.Conf.SearchLimit.WaitTime))
- }
- if data == nil || len(data) == 0 {
- log.Printf("[MANAGER-ERR]CustomManager %s DoSearch %s Is Empty\n", obj.UserId, obj.Value)
- continue
- }
- //缓存结果
- if bytes, err := json.Marshal(data); err == nil && bytes != nil {
- _ = redis.PutBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, obj.UserId), &bytes, customDataCacheTimeLong)
- }
- }
- }
- // checkActivityUser 校验用户是否是活跃用户
- func (this *CustomManager) checkActivityUser(userId string) (exists bool) {
- this.RLock()
- defer this.RUnlock()
- _, exists = this.UserGroup[userId]
- return
- }
|