Ver Fonte

wip:查询暂停及查询时间超时休眠提交

wangkaiyue há 2 anos atrás
pai
commit
2629f71d54
2 ficheiros alterados com 28 adições e 24 exclusões
  1. 1 1
      entity/mananger/aheadManager.go
  2. 27 23
      entity/mananger/customManager.go

+ 1 - 1
entity/mananger/aheadManager.go

@@ -87,7 +87,7 @@ func (this *AheadManager) ScheduledTasks() {
 		if _, err := c.AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil {
 			panic(err)
 		}
-		c.Run()
+		c.Start()
 	}
 	//首次运行圈选用户
 	go this.UpdateUserGroupJob()

+ 27 - 23
entity/mananger/customManager.go

@@ -23,10 +23,10 @@ const (
 
 // CustomManager 定制化报告管理
 type CustomManager struct {
-	Conf         vars.CustomConfig
-	UserGroup    map[string]int // 月活用户
-	BatchFlag    string         // 批次标识
-	IsSearchTime bool           // 是否是运行时间段
+	Conf       vars.CustomConfig
+	UserGroup  map[string]int // 月活用户
+	BatchFlag  string         // 批次标识
+	StopSearch chan bool      // 停止查询信号
 	sync.RWMutex
 }
 
@@ -43,8 +43,9 @@ var (
 // InitCustomManager 初始化
 func InitCustomManager(conf vars.CustomConfig) *CustomManager {
 	manager := &CustomManager{
-		Conf:      conf,
-		UserGroup: make(map[string]int),
+		Conf:       conf,
+		UserGroup:  make(map[string]int),
+		StopSearch: make(chan bool),
 	}
 	//定时任务
 	go manager.ScheduledTasks()
@@ -89,31 +90,30 @@ func (this *CustomManager) ScheduledTasks() {
 		if _, err := c.AddFunc(this.Conf.UpdateCron, this.UpdateUserGroupJob); err != nil {
 			panic(err)
 		}
-		c.Run()
+		c.Start()
 	}
 	// 查询时间段限时
 	if this.Conf.SearchLimit.Switch.Start != "" && this.Conf.SearchLimit.Switch.Stop != "" {
 		//开始
 		startJob := cron.New(cron.WithSeconds())
-		if _, err := startJob.AddFunc(this.Conf.UpdateCron, func() {
-			this.IsSearchTime = true
+		if _, err := startJob.AddFunc(this.Conf.SearchLimit.Switch.Start, func() {
 			go this.DoSearch()
 		}); err != nil {
 			panic(err)
 		}
-		startJob.Run()
+		startJob.Start()
 		//结束
 		endJob := cron.New(cron.WithSeconds())
-		if _, err := endJob.AddFunc(this.Conf.UpdateCron, func() {
-			this.IsSearchTime = false
+		if _, err := endJob.AddFunc(this.Conf.SearchLimit.Switch.Stop, func() {
+			this.StopSearch <- true
 		}); err != nil {
 			panic(err)
 		}
-		endJob.Run()
-	} else {
-		go this.DoSearch()
+		endJob.Start()
 	}
-	//首次运行圈选用户
+	// 首次运行启动查询
+	go this.DoSearch()
+	// 首次运行圈选用户
 	this.UpdateUserGroupJob()
 }
 
@@ -211,19 +211,23 @@ func (this *CustomManager) activityUserQueue(batchFlag string, userIds []string)
 func (this *CustomManager) DoSearch() {
 	log.Printf("[MANAGER-INFO]CustomManager DoSearch Start\n")
 	for {
-		//是否在可执行时间段内
-		if this.IsSearchTime {
-			log.Printf("[MANAGER-INFO]CustomManager DoSearch End\n")
-			return
-		}
 
 		var obj *SearchEntity
 		select { //优先级 newRegisterUserQueue > activityUserQueue
-		case obj = <-newRegisterUserQueue:
+		case <-this.StopSearch: //不在运行时间段内退出查询
+			log.Printf("[MANAGER-INFO]CustomManager DoSearch End\n")
+			return
 		default:
 			select {
 			case obj = <-newRegisterUserQueue:
-			case obj = <-activityUserQueue:
+			default:
+				select {
+				case obj = <-newRegisterUserQueue:
+				case obj = <-activityUserQueue:
+				case <-this.StopSearch: //不在运行时间段内退出查询
+					log.Printf("[MANAGER-INFO]CustomManager DoSearch End\n")
+					return
+				}
 			}
 		}
 		//存在缓存直接跳过