浏览代码

es查询全局限制

xuzhiheng 2 年之前
父节点
当前提交
daa9c17863
共有 4 个文件被更改,包括 60 次插入29 次删除
  1. 6 6
      src/client/customerRule.go
  2. 22 11
      src/history/task.go
  3. 1 1
      src/service/customer_service.go
  4. 31 11
      src/util/utiltag.go

+ 6 - 6
src/client/customerRule.go

@@ -128,7 +128,7 @@ func filter(sql string) string {
 	return sql
 }
 
-//匹配包含detail的组
+// 匹配包含detail的组
 func checkDetail(sql string) (arrMap map[string]string) {
 	arrMap = map[string]string{}
 	res := reg_detail.FindAllStringSubmatch(sql, -1)
@@ -142,12 +142,12 @@ func checkDetail(sql string) (arrMap map[string]string) {
 	return
 }
 
-//校验是否有单字
+// 校验是否有单字
 func checkSingleWord(sql string) bool {
 	return len(reg_single.FindStringSubmatch(sql)) > 0
 }
 
-//计算每组字段的替换值
+// 计算每组字段的替换值
 func makeReplace(s_match string) (s_replace string) {
 	arr := []string{}
 	for _, v := range strings.Split(s_match, ",") {
@@ -158,7 +158,7 @@ func makeReplace(s_match string) (s_replace string) {
 	return strings.Join(arr, ",")
 }
 
-//导入关键词
+// 导入关键词
 func (r *CustomerRule) RuleImport() {
 	defer qu.Catch()
 	if r.Method() == "POST" {
@@ -185,7 +185,7 @@ func (r *CustomerRule) RuleImport() {
 	}
 }
 
-//生成预览数据
+// 生成预览数据
 func (c *CustomerRule) ProductData() {
 	defer qu.Catch()
 	if c.Method() == "POST" {
@@ -261,7 +261,7 @@ func (c *CustomerRule) ProductData() {
 			msg = "数据生成成功"
 		} else {
 			rep = false
-			msg = "数据生成失败"
+			msg = "数据生成失败,请稍后再试"
 			if err.Error() == "请设置开始结束时间" {
 				msg = "请设置开始结束时间"
 			}

+ 22 - 11
src/history/task.go

@@ -163,7 +163,28 @@ func (c *Customer) GetData(stype string, dataSource int) {
 	if esversion == "v1" {
 	} else {
 		esCon := esv.VarEs.(*esv.EsV7)
-		c.EsConGetDataV7(stype, dataSource, esCon)
+		for {
+			listLen := redis.GetInt("session", "es_status")
+			if listLen == 0 {
+				log.Println("es空闲!")
+				break
+			} else if listLen == 1 || listLen == 2 {
+				log.Println("es繁忙,", listLen)
+			}
+			time.Sleep(5 * time.Second)
+		}
+		for {
+			listLens := int(redis.LLEN("datag", "jyqyfw_es_query"))
+			if listLens < 2 {
+				redis.RPUSH("datag", "jyqyfw_es_query", 1)
+				c.EsConGetDataV7(stype, dataSource, esCon)
+				redis.LPOP("datag", "jyqyfw_es_query")
+				break
+			} else {
+				log.Println("企业级服务es进程数过多,", listLens)
+			}
+			time.Sleep(5 * time.Second)
+		}
 	}
 }
 
@@ -173,16 +194,6 @@ func (c *Customer) EsConGetDataV7(stype string, dataSource int, esCon *esv.EsV7)
 	ctx, _ := context.WithTimeout(context.Background(), 3*time.Hour)
 	for _, dm := range c.Departments {
 		for _, sr := range dm.Rules {
-			for {
-				listLen := redis.GetInt("session", "es_status")
-				if listLen == 0 {
-					log.Println("es空闲!")
-					break
-				} else if listLen == 1 || listLen == 2 {
-					log.Println("系统繁忙,请稍后再试 ", listLen)
-				}
-				time.Sleep(5 * time.Second)
-			}
 			ch := make(chan bool, 10)
 			wg := &sync.WaitGroup{}
 			esIndex := Index

+ 1 - 1
src/service/customer_service.go

@@ -861,7 +861,7 @@ func (c *Customer) ProductData() {
 			msg = "数据生成成功"
 		} else {
 			rep = false
-			msg = "数据生成失败"
+			msg = "数据生成失败,请稍后再试"
 			if err.Error() == "请设置开始结束时间" {
 				msg = "请设置开始结束时间"
 			}

+ 31 - 11
src/util/utiltag.go

@@ -140,19 +140,39 @@ func searchDataArr(index, esquery, sdataid string, i_maxnum int64, tags map[stri
 	var (
 		err    error
 		counts = int64(0)
+		times  = 0
+		times2 = 0
 	)
-	// for {
-	listLen := redis.GetInt("session", "es_status")
-	if listLen == 0 {
-		log.Println("es空闲!")
-		err, counts = searchData(index, esquery, sdataid, i_maxnum, tags, maths)
-		// break
-	} else if listLen == 1 || listLen == 2 {
-		err = errors.New("系统繁忙,请稍后再试")
-		// break
+	for {
+		listLen := redis.GetInt("session", "es_status")
+		if listLen == 0 {
+			log.Println("es空闲!")
+			break
+		} else if times > 10 {
+			err = errors.New("系统繁忙,请稍后再试")
+			break
+		} else {
+			log.Println("es繁忙,", listLen)
+		}
+		times += 2
+		time.Sleep(2 * time.Second)
+	}
+	for {
+		listLens := int(redis.LLEN("datag", "jyqyfw_es_query"))
+		if listLens < 2 {
+			redis.RPUSH("datag", "jyqyfw_es_query", 1)
+			err, counts = searchData(index, esquery, sdataid, i_maxnum, tags, maths)
+			redis.LPOP("datag", "jyqyfw_es_query")
+			break
+		} else if times2 > 10 {
+			err = errors.New("系统繁忙,请稍后再试")
+			break
+		} else {
+			log.Println("企业级服务es进程数过多,", listLens)
+		}
+		times2 += 2
+		time.Sleep(2 * time.Second)
 	}
-	// time.Sleep(5 * time.Second)
-	// }
 	return err, counts
 }