package mananger import ( "app.yhyue.com/moapp/jybase/redis" "encoding/json" "fmt" "github.com/robfig/cron/v3" "leadGeneration/entity/activeUsers" "leadGeneration/entity/power" "leadGeneration/entity/search" "leadGeneration/public" "leadGeneration/vars" "log" "sync" "time" ) const ( customCacheDb = "newother" customDataCacheKey = "leadGeneration_customData_%s" customDataCacheTimeLong = 60 * 60 * 24 * 30 customNewUserQueueKey = "leadGeneration_customData_newUserQueue" ) // CustomManager 定制化报告管理 type CustomManager struct { Conf vars.CustomConfig UserGroup map[string]int // 月活用户 BatchFlag string // 批次标识 StopSearch chan bool // 停止查询信号 sync.RWMutex } type SearchEntity struct { UserId string Value string } var ( newRegisterUserQueue = make(chan *SearchEntity) //新用户队列 activityUserQueue = make(chan *SearchEntity) //活跃用户队列 ) // InitCustomManager 初始化 func InitCustomManager(conf vars.CustomConfig) *CustomManager { manager := &CustomManager{ Conf: conf, UserGroup: make(map[string]int), StopSearch: make(chan bool), } //定时任务 go manager.ScheduledTasks() return manager } // GetData 获取查询数据 func (this *CustomManager) GetData(userId, keyWords string, isNew bool) map[string]interface{} { if !(isNew || this.checkActivityUser(userId)) { return nil } //查询是否是付费用户 if power.HasPower(userId) { return nil } //查询数据 if bytes, err := redis.GetBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, userId)); err == nil && bytes != nil { rData := map[string]interface{}{} if err := json.Unmarshal(*bytes, &rData); err != nil { log.Printf("[MANAGER-ERR]CustomManager %s GetData Error %v \n", userId, err) return nil } return rData } else if isNew { //加入优先查询队列 redis.RPUSH(customCacheDb, customNewUserQueueKey, userId) } return nil } // ScheduledTasks 定时任务 func (this *CustomManager) ScheduledTasks() { // 定时圈用户 if this.Conf.UpdateCron != "" { c := cron.New(cron.WithSeconds()) // 给对象增加定时任务 if _, err := c.AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil { panic(err) } c.Start() } // 查询时间段限时 if this.Conf.SearchLimit.Switch.Start != "" && this.Conf.SearchLimit.Switch.Stop != "" { //开始 startJob := cron.New(cron.WithSeconds()) if _, err := startJob.AddFunc(this.Conf.SearchLimit.Switch.Start, func() { go this.DoSearch() }); err != nil { panic(err) } startJob.Start() //结束 endJob := cron.New(cron.WithSeconds()) if _, err := endJob.AddFunc(this.Conf.SearchLimit.Switch.Stop, func() { this.StopSearch <- true }); err != nil { panic(err) } endJob.Start() } // 首次运行启动查询 go this.DoSearch() // 首次运行圈选用户 this.UpdateUserGroupJob() } // UpdateUserGroupJob 更新用户群组 func (this *CustomManager) UpdateUserGroupJob() { if !this.Conf.Open { return } log.Printf("[MANAGER-INFO]CustomManager UserGroup Change Start\n") //更新批次标识 this.BatchFlag = public.GetWeekBatchName(time.Now()) newMap := map[string]int{} userArr := this.getUserGroup() for _, uId := range userArr { newMap[uId] = 0 } //更新新用户群组 this.Lock() defer this.Unlock() this.UserGroup = newMap log.Printf("[MANAGER-INFO]CustomManager UserGroup Changed Finish Total is %d \n", len(this.UserGroup)) go this.activityUserQueue(this.BatchFlag, userArr) go this.newUserQueue(this.BatchFlag) } //getUserGroup 获取用户群组 func (this *CustomManager) getUserGroup() (userIds []string) { //当前批次是否已有数据 rData := public.UserAnalyseDb.SelectBySql("SELECT user_mongoid AS uid FROM user_leadGeneration_group WHERE group_type = 1 AND group_name=?", this.BatchFlag) if rData != nil || len(*rData) == 0 { log.Printf("[MANAGER-INFO]CustomManager getUserGroup from Db Total is %d \n", len(*rData)) userIds = make([]string, 0, len(*rData)) for _, m := range *rData { if uId, _ := m["uid"].(string); uId != "" { userIds = append(userIds, uId) } } //是否有新增测试用户 if len(userIds) > 0 && len(vars.Config.TestUid) > 0 { var newTest []string for _, uid := range vars.Config.TestUid { has := false for _, id := range userIds { if uid == id { has = true break } } if !has && !power.HasPower(uid) { newTest = append(newTest, uid) } } if len(newTest) > 0 { userIds = append(userIds, newTest...) activeUsers.SaveBatchGroup(newTest, 1, this.BatchFlag, "testUser") } } } //无数据则重新生成用户群组数据 if len(userIds) == 0 { log.Printf("[MANAGER-INFO]CustomManager getUserGroup createNewGroup start\n") var ( wg sync.WaitGroup newActiveGroup, testUserGroup []string ) wg.Add(2) //测试用户群组 go func() { defer wg.Done() if len(vars.Config.TestUid) > 0 { for _, uid := range vars.Config.TestUid { if !power.HasPower(uid) { testUserGroup = append(testUserGroup, uid) } } } activeUsers.SaveBatchGroup(testUserGroup, 1, this.BatchFlag, "testUser") }() //新活跃用户 go func() { defer wg.Done() newActiveGroup = activeUsers.GetMonthActiveFreeUsers() activeUsers.SaveBatchGroup(newActiveGroup, 1, this.BatchFlag, "newActive") }() wg.Wait() userIds = make([]string, 0, len(newActiveGroup)+len(testUserGroup)) userIds = append(userIds, newActiveGroup...) userIds = append(userIds, testUserGroup...) log.Printf("[MANAGER-INFO]CustomManager getUserGroup createNewGroup end Total %d\n", len(userIds)) } return } // activityUserQueue 活跃用户查询队列 func (this *CustomManager) activityUserQueue(batchFlag string, userIds []string) { for i, userId := range userIds { if i%100 == 0 { log.Printf("[MANAGER-INFO]CustomManager Batch %s Now(%d/%d)\n", batchFlag, i, len(userIds)) } //当批次更新时,上批次停止 if this.BatchFlag != batchFlag { log.Printf("[MANAGER-INFO]CustomManager Batch %s Is END At (%d/%d) \n", batchFlag, i, len(userIds)) break } activityUserQueue <- &SearchEntity{ UserId: userId, } } log.Printf("[MANAGER-INFO]CustomManager Batch %s Is Finished !!!\n", batchFlag) } func (this *CustomManager) newUserQueue(batchFlag string) { for { if this.BatchFlag != batchFlag { unFinishedNum := redis.LLEN(customCacheDb, customNewUserQueueKey) if unFinishedNum > 0 { redis.Del(customCacheDb, customNewUserQueueKey) } log.Printf("[MANAGER-INFO]CustomManager newUserQueue End unfinished %d \n", unFinishedNum) return } if uid, _ := redis.LPOP(customCacheDb, customNewUserQueueKey).(string); uid != "" { newRegisterUserQueue <- &SearchEntity{ UserId: uid, } } } } // DoSearch 定制化分析报告查询队列 func (this *CustomManager) DoSearch() { log.Printf("[MANAGER-INFO]CustomManager DoSearch Start\n") for { var obj *SearchEntity select { //优先级 newRegisterUserQueue > activityUserQueue case <-this.StopSearch: //不在运行时间段内退出查询 log.Printf("[MANAGER-INFO]CustomManager DoSearch End\n") return default: select { case obj = <-newRegisterUserQueue: default: select { case obj = <-newRegisterUserQueue: case obj = <-activityUserQueue: case <-this.StopSearch: //不在运行时间段内退出查询 log.Printf("[MANAGER-INFO]CustomManager DoSearch End\n") return } } } //存在缓存直接跳过 key := fmt.Sprintf(customDataCacheKey, obj.UserId) if has, err := redis.Exists(customCacheDb, key); has && err == nil { continue } //查询结果处理 searchStart := time.Now() data := search.PotentialCustomizeAnalysis(obj.UserId, obj.Value) //查询超时,则休息一下 if time.Now().Sub(searchStart).Seconds() > float64(this.Conf.SearchLimit.TimeOver) { time.Sleep(time.Second * time.Duration(this.Conf.SearchLimit.WaitTime)) } if data == nil || len(data) == 0 { log.Printf("[MANAGER-ERR]CustomManager %s DoSearch %s Is Empty\n", obj.UserId, obj.Value) continue } //缓存结果 if bytes, err := json.Marshal(data); err == nil && bytes != nil { _ = redis.PutBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, obj.UserId), &bytes, customDataCacheTimeLong) } } } // checkActivityUser 校验用户是否是活跃用户 func (this *CustomManager) checkActivityUser(userId string) (exists bool) { this.RLock() defer this.RUnlock() _, exists = this.UserGroup[userId] return }