123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- package mananger
- import (
- "app.yhyue.com/moapp/jybase/redis"
- "fmt"
- "github.com/robfig/cron/v3"
- "leadGeneration/entity/activeUsers"
- "leadGeneration/entity/power"
- "leadGeneration/entity/search"
- "leadGeneration/public"
- "leadGeneration/vars"
- "log"
- "sort"
- "sync"
- "time"
- )
- // AheadManager 超前项目管理
- type AheadManager struct {
- Conf vars.AheadConfig
- UserGroup map[string]int
- BatchFlag string
- sync.RWMutex
- }
- const (
- //缓存
- AheadCacheDb = "newother"
- AheadRequestFrequencyCacheKey = "leadGeneration_AheadRequest_%s_%d_%s"
- AheadRequestTimesLong = 60 * 60 * 24 //缓存一天
- )
- // InitAheadManager 初始化
- func InitAheadManager(conf vars.AheadConfig) *AheadManager {
- manager := &AheadManager{
- Conf: conf,
- UserGroup: make(map[string]int),
- }
- go manager.ScheduledTasks()
- return manager
- }
- // GetData 获取查询数据
- func (this *AheadManager) GetData(userId, keyWords string, isNew bool, t int64) map[string]interface{} {
- if !(isNew || this.checkGroupUser(userId)) {
- return nil
- }
- //查询是否是有超前项目权限
- if power.HasAheadPower(userId) {
- return nil
- }
- //校验每日请求次数
- cacheKey := fmt.Sprintf(AheadRequestFrequencyCacheKey, time.Now().Format(public.TimeFormat), t, userId)
- if this.Conf.DailyTimes <= redis.GetInt(AheadCacheDb, cacheKey) {
- return nil
- }
- //查询数据
- rDate := search.AdvancedProject(userId, keyWords, t)
- //累计请求计数
- if rDate != nil && len(rDate) > 0 {
- if num := redis.Incr(AheadCacheDb, cacheKey); num == 1 {
- _ = redis.SetExpire(AheadCacheDb, cacheKey, AheadRequestTimesLong)
- }
- }
- return rDate
- }
- // Click 用户点击
- func (this *AheadManager) Click(userId string) bool {
- this.Lock()
- defer this.Unlock()
- if v, ok := this.UserGroup[userId]; ok {
- this.UserGroup[userId] = v
- return true
- }
- return false
- }
- // CheckGroupUser 校验用户是否有资格展示
- func (this *AheadManager) checkGroupUser(userId string) (exists bool) {
- this.RLock()
- defer this.RUnlock()
- _, exists = this.UserGroup[userId]
- return
- }
- // ScheduledTasks 定时任务
- func (this *AheadManager) ScheduledTasks() {
- if this.Conf.UpdateCron != "" {
- c := cron.New(cron.WithSeconds())
- // 给对象增加定时任务
- if _, err := c.AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil {
- panic(err)
- }
- c.Start()
- }
- //首次运行圈选用户
- go this.UpdateUserGroupJob()
- }
- // UpdateUserGroupJob 更新用户群组
- func (this *AheadManager) UpdateUserGroupJob() {
- if this.Conf.Prop <= 0 {
- return
- }
- log.Printf("[MANAGER-INFO]AheadManager UserGroup Change Start\n")
- this.BatchFlag = public.GetWeekBatchName(time.Now())
- newMap := map[string]int{}
- //新圈用户
- for _, uId := range this.getUserGroup() {
- newMap[uId] = 0
- }
- this.Lock()
- defer this.Unlock()
- this.UserGroup = newMap
- log.Printf("[MANAGER-INFO]AheadManager UserGroup Changed Finish Total is %d \n", len(this.UserGroup))
- }
- // getUserGroup 取用户
- func (this *AheadManager) getUserGroup() (userIds []string) {
- //当前批次是否已有数据
- rData := public.UserAnalyseDb.SelectBySql("SELECT user_mongoid AS uid FROM user_leadGeneration_group WHERE group_type = 0 AND group_name=?", this.BatchFlag)
- if rData != nil || len(*rData) == 0 {
- log.Printf("[MANAGER-INFO]AheadManager getUserGroup from Db Total is %d \n", len(*rData))
- userIds = make([]string, 0, len(*rData))
- for _, m := range *rData {
- if uId, _ := m["uid"].(string); uId != "" {
- userIds = append(userIds, uId)
- }
- }
- //是否有新增测试用户
- if len(userIds) > 0 && len(vars.Config.TestUid) > 0 {
- var newTest []string
- for _, uid := range vars.Config.TestUid {
- has := false
- for _, id := range userIds {
- if uid == id {
- has = true
- break
- }
- }
- if !has && !power.HasAheadPower(uid) {
- newTest = append(newTest, uid)
- }
- }
- if len(newTest) > 0 {
- userIds = append(userIds, newTest...)
- activeUsers.SaveBatchGroup(newTest, 0, this.BatchFlag, "testUser")
- }
- }
- }
- userIds = []string{}
- //无数据则重新生成用户群组数据
- if len(userIds) == 0 {
- log.Printf("[MANAGER-INFO]AheadManager getUserGroup createNewGroup start\n")
- var (
- wg sync.WaitGroup
- newActiveGroup, lastActiveGroup, testUserGroup []string
- )
- wg.Add(3)
- //新活跃用户
- go func() {
- defer wg.Done()
- if this.Conf.Mode == ModeMonth { // 月活
- newActiveGroup = this.sortUserByBatchAndGetFinal(AheadActiveFreeUser.GetMonthActiveFreeUsers())
- } else { // 周活
- newActiveGroup = this.sortUserByBatchAndGetFinal(AheadActiveFreeUser.GetWeekActiveFreeUsers())
- }
- activeUsers.SaveBatchGroup(newActiveGroup, 0, this.BatchFlag, "newActive")
- }()
- //测试用户群组
- go func() {
- defer wg.Done()
- if len(vars.Config.TestUid) > 0 {
- for _, uid := range vars.Config.TestUid {
- if !power.HasAheadPower(uid) {
- testUserGroup = append(testUserGroup, uid)
- }
- }
- }
- activeUsers.SaveBatchGroup(testUserGroup, 0, this.BatchFlag, "testUser")
- }()
- //查询上批次活跃用户(点击量满足用户)
- go func() {
- defer wg.Done()
- this.RLock()
- defer this.RUnlock()
- for uId, clickNum := range this.UserGroup {
- if clickNum > this.Conf.SaveClickTimes {
- lastActiveGroup = append(lastActiveGroup, uId)
- }
- }
- activeUsers.SaveBatchGroup(lastActiveGroup, 0, this.BatchFlag, "oldActive")
- }()
- wg.Wait()
- userIds = make([]string, 0, len(newActiveGroup)+len(lastActiveGroup)+len(testUserGroup))
- userIds = append(userIds, newActiveGroup...)
- userIds = append(userIds, lastActiveGroup...)
- userIds = append(userIds, testUserGroup...)
- log.Printf("[MANAGER-INFO]AheadManager getUserGroup createNewGroup Total %d \n New:%d Old:%d Test:%d \n",
- len(userIds), len(newActiveGroup), len(lastActiveGroup), len(testUserGroup))
- }
- return
- }
- // sortUserByBatchAndGetFinal 根据百分比取用户,优先级【没有参与>参与时间早的>参与时间晚的】
- func (this *AheadManager) sortUserByBatchAndGetFinal(userIds []string) (rData []string) {
- log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Begin\n")
- total := int(this.Conf.Prop * float64(len(userIds)))
- //查询历史分组情况
- gData := public.UserAnalyseDb.SelectBySql("SELECT * FROM ( SELECT user_mongoid AS uid, MAX( UNIX_TIMESTAMP(create_time) ) AS gTime FROM user_leadGeneration_group WHERE group_type = 0 GROUP BY user_mongoid ) AS groupData ORDER BY gTime ASC")
- log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Search Finished\n")
- // 无记录直接返回数组前final_count个用户id
- if gData == nil || len(*gData) == 0 {
- return userIds[:total]
- }
- sortMap := map[string]int64{}
- for _, m := range *gData {
- if uId, _ := m["uid"].(string); uId != "" {
- num, _ := m["gTime"].(int64)
- sortMap[uId] = num
- }
- }
- log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Sort Begin\n")
- // 有记录进行排序筛选
- sort.Slice(userIds, func(i, j int) bool {
- iNum, _ := sortMap[userIds[i]]
- jNum, _ := sortMap[userIds[j]]
- if iNum < jNum {
- return true
- }
- return false
- })
- log.Printf("[MANAGER-INFO]AheadManager sortUserByBatchAndGetFinal Finished\n")
- return userIds[:total]
- }
|