|
@@ -1 +1,165 @@
|
|
|
package mananger
|
|
|
+
|
|
|
+import (
|
|
|
+ "app.yhyue.com/moapp/jybase/redis"
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
+ "github.com/robfig/cron/v3"
|
|
|
+ "leadGeneration/entity/activeUsers"
|
|
|
+ "leadGeneration/entity/power"
|
|
|
+ "leadGeneration/entity/search"
|
|
|
+ "leadGeneration/public"
|
|
|
+ "leadGeneration/vars"
|
|
|
+ "log"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+const (
|
|
|
+ customCacheDb = "newother"
|
|
|
+ customDataCacheKey = "leadGeneration_customData_%s"
|
|
|
+ customDataCacheTimeLong = 60 * 60 * 24 * 31
|
|
|
+)
|
|
|
+
|
|
|
+//CustomManager 定制化报告管理
|
|
|
+type CustomManager struct {
|
|
|
+ Conf vars.CustomConfig
|
|
|
+ UserGroup map[string]int // 月活用户
|
|
|
+ BatchFlag string // 批次标识
|
|
|
+ sync.RWMutex
|
|
|
+}
|
|
|
+
|
|
|
+type SearchEntity struct {
|
|
|
+ UserId string
|
|
|
+ Value string
|
|
|
+}
|
|
|
+
|
|
|
+var (
|
|
|
+ newRegisterUserQueue = make(chan *SearchEntity) //新用户队列
|
|
|
+ activityUserQueue = make(chan *SearchEntity) //活跃用户队列
|
|
|
+)
|
|
|
+
|
|
|
+// InitCustomManager 初始化
|
|
|
+func InitCustomManager(conf vars.CustomConfig) *CustomManager {
|
|
|
+ manager := &CustomManager{
|
|
|
+ Conf: conf,
|
|
|
+ UserGroup: make(map[string]int),
|
|
|
+ }
|
|
|
+ go manager.ScheduledTasks()
|
|
|
+ go manager.DoSearch()
|
|
|
+ return manager
|
|
|
+}
|
|
|
+
|
|
|
+// GetData 获取查询数据
|
|
|
+func (this *CustomManager) GetData(userId, keyWords string, isNew bool) map[string]interface{} {
|
|
|
+ if !(isNew || this.checkActivityUser(userId)) {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ //查询是否是付费用户
|
|
|
+ if power.HasPower(userId) {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ //查询数据
|
|
|
+ if bytes, err := redis.GetBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, userId)); err == nil && bytes != nil {
|
|
|
+ rData := map[string]interface{}{}
|
|
|
+ if err := json.Unmarshal(*bytes, &rData); err != nil {
|
|
|
+ log.Printf("[ERROR]CustomManager %s GetData Error %v \n", userId, err)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return rData
|
|
|
+ } else if isNew {
|
|
|
+ //加入优先查询队列
|
|
|
+ go func() {
|
|
|
+ newRegisterUserQueue <- &SearchEntity{
|
|
|
+ UserId: userId,
|
|
|
+ Value: keyWords,
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+//ScheduledTasks 定时任务
|
|
|
+func (this *CustomManager) ScheduledTasks() {
|
|
|
+ if this.Conf.UpdateCron != "" {
|
|
|
+ // 给对象增加定时任务
|
|
|
+ if _, err := cron.New().AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //首次运行圈选用户
|
|
|
+ this.UpdateUserGroupJob()
|
|
|
+}
|
|
|
+
|
|
|
+//UpdateUserGroupJob 更新用户群组
|
|
|
+func (this *CustomManager) UpdateUserGroupJob() {
|
|
|
+ //查询月活用户
|
|
|
+ userMap := activeUsers.GetMonthActiveFreeUsers()
|
|
|
+ this.Lock()
|
|
|
+ defer this.Unlock()
|
|
|
+ this.UserGroup = userMap
|
|
|
+
|
|
|
+ //更新批次标识
|
|
|
+ batchFlag := time.Now().Format(public.Date_Full_Layout)
|
|
|
+ this.BatchFlag = batchFlag
|
|
|
+
|
|
|
+ userArr := make([]string, len(this.UserGroup), 0)
|
|
|
+ for userId, _ := range this.UserGroup {
|
|
|
+ userArr = append(userArr, userId)
|
|
|
+ }
|
|
|
+ go this.activityUserQueue(batchFlag, []string{})
|
|
|
+}
|
|
|
+
|
|
|
+//activityUserQueue 活跃用户查询队列
|
|
|
+func (this *CustomManager) activityUserQueue(batchFlag string, userIds []string) {
|
|
|
+ for i, userId := range userIds {
|
|
|
+ if i%100 == 0 {
|
|
|
+ log.Printf("[INFO]CustomManager Batch %s Now(%d/%d)\n", batchFlag, len(userIds), i)
|
|
|
+ }
|
|
|
+ //当批次更新时,上批次停止
|
|
|
+ if this.BatchFlag != batchFlag {
|
|
|
+ log.Printf("[INFO]CustomManager Batch %s Is END At (%d/%d) \n", batchFlag, i, len(userIds))
|
|
|
+ break
|
|
|
+ }
|
|
|
+ activityUserQueue <- &SearchEntity{
|
|
|
+ UserId: userId,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.Printf("[INFO]CustomManager Batch %s Is Finished !!!\n", batchFlag)
|
|
|
+}
|
|
|
+
|
|
|
+//DoSearch 定制化分析报告查询队列
|
|
|
+func (this *CustomManager) DoSearch() {
|
|
|
+ for {
|
|
|
+ var obj *SearchEntity
|
|
|
+ select { //优先级 newRegisterUserQueue > activityUserQueue
|
|
|
+ case obj = <-newRegisterUserQueue:
|
|
|
+ default:
|
|
|
+ select {
|
|
|
+ case obj = <-newRegisterUserQueue:
|
|
|
+ case obj = <-activityUserQueue:
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //查询结果处理
|
|
|
+ data, err := search.PotentialCustomizeAnalysis(obj.UserId, obj.Value)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("[ERROR]CustomManager %s DoSearch %s Error %v\n", obj.UserId, obj.Value, err)
|
|
|
+ }
|
|
|
+ if data != nil || len(data) == 0 {
|
|
|
+ log.Printf("[ERROR]CustomManager %s DoSearch %s Error %v\n", obj.UserId, obj.Value, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //缓存结果
|
|
|
+ if bytes, err := json.Marshal(data); err == nil && bytes != nil {
|
|
|
+ _ = redis.PutBytes(customCacheDb, fmt.Sprintf(customDataCacheKey, obj.UserId), &bytes, customDataCacheTimeLong)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//checkActivityUser 校验用户是否是活跃用户
|
|
|
+func (this *CustomManager) checkActivityUser(userId string) (exists bool) {
|
|
|
+ this.RLock()
|
|
|
+ defer this.RUnlock()
|
|
|
+ _, exists = this.UserGroup[userId]
|
|
|
+ return
|
|
|
+}
|