Преглед изворни кода

wip:查询暂停及查询时间超时休眠提交

wangkaiyue пре 2 година
родитељ
комит
440df3f4b3
3 измењених фајлова са 27 додато и 12 уклоњено
  1. 0 1
      config.json
  2. 23 6
      entity/mananger/customManager.go
  3. 4 5
      vars/config.go

+ 0 - 1
config.json

@@ -8,7 +8,6 @@
   },
   "custom": {
     "open": true,
-    "searchPool": 1,
     "updateCron": "0 0 0 ? * 0",
     "searchLimit": {
       "switch": {

+ 23 - 6
entity/mananger/customManager.go

@@ -19,6 +19,7 @@ const (
 	customCacheDb           = "newother"
 	customDataCacheKey      = "leadGeneration_customData_%s"
 	customDataCacheTimeLong = 60 * 60 * 24 * 30
+	customNewUserQueueKey   = "leadGeneration_customData_newUserQueue"
 )
 
 // CustomManager 定制化报告管理
@@ -71,12 +72,7 @@ func (this *CustomManager) GetData(userId, keyWords string, isNew bool) map[stri
 		return rData
 	} else if isNew {
 		//加入优先查询队列
-		go func() {
-			newRegisterUserQueue <- &SearchEntity{
-				UserId: userId,
-				Value:  keyWords,
-			}
-		}()
+		redis.RPUSH(customCacheDb, customNewUserQueueKey, userId)
 	}
 	return nil
 }
@@ -139,8 +135,10 @@ func (this *CustomManager) UpdateUserGroupJob() {
 	this.UserGroup = newMap
 	log.Printf("[MANAGER-INFO]CustomManager UserGroup Changed Finish Total is %d \n", len(this.UserGroup))
 	go this.activityUserQueue(this.BatchFlag, userArr)
+	go this.newUserQueue(this.BatchFlag)
 }
 
+//getUserGroup 获取用户群组
 func (this *CustomManager) getUserGroup() (userIds []string) {
 	//当前批次是否已有数据
 	rData := public.UserAnalyseDb.SelectBySql("SELECT user_mongoid AS uid FROM user_leadGeneration_group WHERE group_type = 1 AND group_name=?", this.BatchFlag)
@@ -207,6 +205,25 @@ func (this *CustomManager) activityUserQueue(batchFlag string, userIds []string)
 	log.Printf("[MANAGER-INFO]CustomManager Batch %s Is Finished !!!\n", batchFlag)
 }
 
+func (this *CustomManager) newUserQueue(batchFlag string) {
+	for {
+		if this.BatchFlag != batchFlag {
+			unFinishedNum := redis.LLEN(customCacheDb, customNewUserQueueKey)
+			if unFinishedNum > 0 {
+				redis.Del(customCacheDb, customNewUserQueueKey)
+			}
+			log.Printf("[MANAGER-INFO]CustomManager newUserQueue End unfinished %d \n", unFinishedNum)
+			return
+		}
+
+		if uid, _ := redis.LPOP(customCacheDb, customNewUserQueueKey).(string); uid != "" {
+			newRegisterUserQueue <- &SearchEntity{
+				UserId: uid,
+			}
+		}
+	}
+}
+
 // DoSearch 定制化分析报告查询队列
 func (this *CustomManager) DoSearch() {
 	log.Printf("[MANAGER-INFO]CustomManager DoSearch Start\n")

+ 4 - 5
vars/config.go

@@ -22,15 +22,14 @@ type AheadConfig struct {
 
 type CustomConfig struct {
 	Open        bool   `json:"open"`       //是否运行查询
-	SearchPool  int    `json:"searchPool"` //检索并发池
 	UpdateCron  string `json:"updateCron"` //更新周活用户
 	SearchLimit struct {
 		Switch struct {
-			Stop  string `json:"stop"`
-			Start string `json:"start"`
+			Stop  string `json:"stop"`  //搜索停止
+			Start string `json:"start"` //搜索开始
 		} `json:"switch"`
-		TimeOver int `json:"timeOver"` //
-		WaitTime int `json:"waitTime"`
+		TimeOver int `json:"timeOver"` //超时
+		WaitTime int `json:"waitTime"` //休息
 	} `json:"searchLimit"`
 }