package mananger import ( "app.yhyue.com/moapp/jybase/redis" "fmt" "github.com/robfig/cron/v3" "leadGeneration/entity/activeUsers" "leadGeneration/entity/power" "leadGeneration/entity/search" "leadGeneration/public" "leadGeneration/vars" "log" "sort" "sync" "time" ) // AheadManager 超前项目管理 type AheadManager struct { Conf vars.AheadConfig UserGroup map[string]int BatchFlag string sync.RWMutex } const ( //缓存 AheadCacheDb = "newother" AheadRequestFrequencyCacheKey = "leadGeneration_AheadRequest_%s_%d_%s" AheadRequestTimesLong = 60 * 60 * 24 //缓存一天 ) // InitAheadManager 初始化 func InitAheadManager(conf vars.AheadConfig) *AheadManager { manager := &AheadManager{ Conf: conf, UserGroup: make(map[string]int), } go manager.ScheduledTasks() return manager } // GetData 获取查询数据 func (this *AheadManager) GetData(userId, keyWords string, isNew bool, t int64) map[string]interface{} { if !(isNew || this.checkGroupUser(userId)) { return nil } //校验每日请求次数 cacheKey := fmt.Sprintf(AheadRequestFrequencyCacheKey, time.Now().Format(public.TimeFormat), t, userId) if this.Conf.DailyTimes <= redis.GetInt(AheadCacheDb, cacheKey) { return nil } //查询数据 rDate := search.AdvancedProject(userId, keyWords) //累计请求计数 if rDate != nil && len(rDate) > 0 { if num := redis.Incr(AheadCacheDb, cacheKey); num == 1 { _ = redis.SetExpire(AheadCacheDb, cacheKey, AheadRequestTimesLong) } } return rDate } // Click 用户点击 func (this *AheadManager) Click(userId string) bool { this.Lock() defer this.Unlock() if v, ok := this.UserGroup[userId]; ok { this.UserGroup[userId] = v return true } return false } // CheckGroupUser 校验用户是否有资格展示 func (this *AheadManager) checkGroupUser(userId string) (exists bool) { this.RLock() defer this.RUnlock() _, exists = this.UserGroup[userId] return } // ScheduledTasks 定时任务 func (this *AheadManager) ScheduledTasks() { if this.Conf.UpdateCron != "" { c := cron.New(cron.WithSeconds()) // 给对象增加定时任务 if _, err := c.AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil { panic(err) } c.Start() } //首次运行圈选用户 go this.UpdateUserGroupJob() } // UpdateUserGroupJob 更新用户群组 func (this *AheadManager) UpdateUserGroupJob() { if this.Conf.Prop <= 0 { return } log.Printf("[MANAGER-INFO]AheadManager UserGroup Change Start\n") this.BatchFlag = public.GetWeekBatchName(time.Now()) newMap := map[string]int{} //新圈用户 for _, uId := range this.getUserGroup() { newMap[uId] = 0 } this.Lock() defer this.Unlock() this.UserGroup = newMap log.Printf("[MANAGER-INFO]AheadManager UserGroup Changed Finish Total is %d \n", len(this.UserGroup)) } // getUserGroup 取用户 func (this *AheadManager) getUserGroup() (userIds []string) { //当前批次是否已有数据 rData := public.UserAnalyseDb.SelectBySql("SELECT user_mongoid AS uid FROM user_leadGeneration_group WHERE group_type = 0 AND group_name=?", this.BatchFlag) if rData != nil || len(*rData) == 0 { log.Printf("[MANAGER-INFO]AheadManager getUserGroup from Db Total is %d \n", len(*rData)) userIds = make([]string, 0, len(*rData)) for _, m := range *rData { if uId, _ := m["uid"].(string); uId != "" { userIds = append(userIds, uId) } } //是否有新增测试用户 if len(userIds) > 0 && len(vars.Config.TestUid) > 0 { var newTest []string for _, uid := range vars.Config.TestUid { has := false for _, id := range userIds { if uid == id { has = true break } } if !has && !power.HasAheadPower(uid) { newTest = append(newTest, uid) } } if len(newTest) > 0 { userIds = append(userIds, newTest...) activeUsers.SaveBatchGroup(newTest, 0, this.BatchFlag, "testUser") } } } //无数据则重新生成用户群组数据 if len(userIds) == 0 { log.Printf("[MANAGER-INFO]AheadManager getUserGroup createNewGroup start\n") var ( wg sync.WaitGroup newActiveGroup, lastActiveGroup, testUserGroup []string ) wg.Add(3) //新活跃用户 go func() { defer wg.Done() newActiveGroup = this.sortUserByBatchAndGetFinal(activeUsers.GetWeekActiveFreeUsers()) activeUsers.SaveBatchGroup(newActiveGroup, 0, this.BatchFlag, "newActive") }() //测试用户群组 go func() { defer wg.Done() if len(vars.Config.TestUid) > 0 { for _, uid := range vars.Config.TestUid { if !power.HasAheadPower(uid) { testUserGroup = append(testUserGroup, uid) } } } activeUsers.SaveBatchGroup(testUserGroup, 0, this.BatchFlag, "testUser") }() //查询上批次活跃用户(点击量满足用户) go func() { defer wg.Done() this.RLock() defer this.RUnlock() for uId, clickNum := range this.UserGroup { if clickNum > this.Conf.SaveClickTimes { lastActiveGroup = append(lastActiveGroup, uId) } } activeUsers.SaveBatchGroup(lastActiveGroup, 0, this.BatchFlag, "oldActive") }() wg.Wait() userIds = make([]string, 0, len(newActiveGroup)+len(lastActiveGroup)+len(testUserGroup)) userIds = append(userIds, newActiveGroup...) userIds = append(userIds, lastActiveGroup...) userIds = append(userIds, testUserGroup...) log.Printf("[MANAGER-INFO]AheadManager getUserGroup createNewGroup Total %d \n New:%d Old:%d Test:%d \n", len(userIds), len(newActiveGroup), len(lastActiveGroup), len(testUserGroup)) } return } // sortUserByBatchAndGetFinal 根据百分比取用户,优先级【没有参与>参与时间早的>参与时间晚的】 func (this *AheadManager) sortUserByBatchAndGetFinal(userIds []string) (rData []string) { log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Begin\n") total := int(this.Conf.Prop * float64(len(userIds))) //查询历史分组情况 gData := public.UserAnalyseDb.SelectBySql("SELECT * FROM ( SELECT user_mongoid AS uid, MAX( UNIX_TIMESTAMP(create_time) ) AS gTime FROM user_leadGeneration_group WHERE group_type = 0 GROUP BY user_mongoid ) AS groupData ORDER BY gTime ASC") log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Search Finished\n") // 无记录直接返回数组前final_count个用户id if gData == nil || len(*gData) == 0 { return userIds[:total] } sortMap := map[string]int64{} for _, m := range *gData { if uId, _ := m["uid"].(string); uId != "" { num, _ := m["gTime"].(int64) sortMap[uId] = num } } log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Sort Begin\n") // 有记录进行排序筛选 sort.Slice(userIds, func(i, j int) bool { iNum, _ := sortMap[userIds[i]] jNum, _ := sortMap[userIds[j]] if iNum < jNum { return true } return false }) log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Finished\n") return userIds[:total] }