123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- package mananger
- import (
- "app.yhyue.com/moapp/jybase/redis"
- "encoding/json"
- "fmt"
- "github.com/robfig/cron/v3"
- "leadGeneration/entity/activeUsers"
- "leadGeneration/entity/power"
- "leadGeneration/entity/search"
- "leadGeneration/public"
- "leadGeneration/vars"
- "log"
- "sync"
- "time"
- )
- const (
- customCacheDb = "newother"
- customDataCacheKey = "leadGeneration_customData_%s"
- customDataCacheTimeLong = 60 * 60 * 24 * 30
- )
- // CustomManager 定制化报告管理
- type CustomManager struct {
- Conf vars.CustomConfig
- UserGroup map[string]int // 月活用户
- BatchFlag string // 批次标识
- sync.RWMutex
- }
- type SearchEntity struct {
- UserId string
- Value string
- }
- var (
- newRegisterUserQueue = make(chan *SearchEntity) //新用户队列
- activityUserQueue = make(chan *SearchEntity) //活跃用户队列
- )
- // InitCustomManager 初始化
- func InitCustomManager(conf vars.CustomConfig) *CustomManager {
- manager := &CustomManager{
- Conf: conf,
- UserGroup: make(map[string]int),
- }
- //圈用户
- go manager.ScheduledTasks()
- //执行查询
- go manager.DoSearch()
- return manager
- }
- // GetData 获取查询数据
- func (this *CustomManager) GetData(userId, keyWords string, isNew bool) map[string]interface{} {
- if !(isNew || this.checkActivityUser(userId)) {
- return nil
- }
- //查询是否是付费用户
- if power.HasPower(userId) {
- return nil
- }
- //查询数据
- if bytes, err := redis.GetBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, userId)); err == nil && bytes != nil {
- rData := map[string]interface{}{}
- if err := json.Unmarshal(*bytes, &rData); err != nil {
- log.Printf("[ERROR]CustomManager %s GetData Error %v \n", userId, err)
- return nil
- }
- return rData
- } else if isNew {
- //加入优先查询队列
- go func() {
- newRegisterUserQueue <- &SearchEntity{
- UserId: userId,
- Value: keyWords,
- }
- }()
- }
- return nil
- }
- // ScheduledTasks 定时任务
- func (this *CustomManager) ScheduledTasks() {
- if this.Conf.UpdateCron != "" {
- // 给对象增加定时任务
- if _, err := cron.New().AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil {
- panic(err)
- }
- }
- //首次运行圈选用户
- this.UpdateUserGroupJob()
- }
- // UpdateUserGroupJob 更新用户群组
- func (this *CustomManager) UpdateUserGroupJob() {
- //查询月活用户
- userArr := activeUsers.GetMonthActiveFreeUsers()
- newMap := map[string]int{}
- //测试账户
- if len(vars.Config.TestUid) > 0 {
- for _, uid := range vars.Config.TestUid {
- newMap[uid] = 0
- }
- }
- for _, uId := range userArr {
- newMap[uId] = 0
- }
- //更新新用户群组
- this.Lock()
- defer this.Unlock()
- this.UserGroup = newMap
- //更新批次标识
- batchFlag := time.Now().Format(public.Date_Full_Layout)
- this.BatchFlag = batchFlag
- log.Printf("CustomManager NewGroup %v\n", newMap)
- go this.activityUserQueue(batchFlag, userArr)
- }
- // activityUserQueue 活跃用户查询队列
- func (this *CustomManager) activityUserQueue(batchFlag string, userIds []string) {
- for i, userId := range userIds {
- if i%100 == 0 {
- log.Printf("[INFO]CustomManager Batch %s Now(%d/%d)\n", batchFlag, len(userIds), i)
- }
- //当批次更新时,上批次停止
- if this.BatchFlag != batchFlag {
- log.Printf("[INFO]CustomManager Batch %s Is END At (%d/%d) \n", batchFlag, i, len(userIds))
- break
- }
- activityUserQueue <- &SearchEntity{
- UserId: userId,
- }
- }
- log.Printf("[INFO]CustomManager Batch %s Is Finished !!!\n", batchFlag)
- }
- // DoSearch 定制化分析报告查询队列
- func (this *CustomManager) DoSearch() {
- for {
- //是否在可执行时间段内
- //???
- var obj *SearchEntity
- select { //优先级 newRegisterUserQueue > activityUserQueue
- case obj = <-newRegisterUserQueue:
- default:
- select {
- case obj = <-newRegisterUserQueue:
- case obj = <-activityUserQueue:
- }
- }
- //查询结果处理
- data := search.PotentialCustomizeAnalysis(obj.UserId, obj.Value)
- if data == nil || len(data) == 0 {
- log.Printf("[ERROR]CustomManager %s DoSearch %s Error %v\n", obj.UserId, obj.Value)
- continue
- }
- //缓存结果
- if bytes, err := json.Marshal(data); err == nil && bytes != nil {
- _ = redis.PutBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, obj.UserId), &bytes, customDataCacheTimeLong)
- }
- }
- }
- // checkActivityUser 校验用户是否是活跃用户
- func (this *CustomManager) checkActivityUser(userId string) (exists bool) {
- if len(vars.Config.TestUid) > 0 {
- for _, uid := range vars.Config.TestUid {
- if uid == userId {
- return true
- }
- }
- }
- this.RLock()
- defer this.RUnlock()
- _, exists = this.UserGroup[userId]
- return
- }
|