xuzhiheng 2 lat temu
rodzic
commit
f8abbba4fa
2 zmienionych plików z 57 dodań i 31 usunięć
  1. 9 9
      src/history/task.go
  2. 48 22
      src/util/utiltag.go

+ 9 - 9
src/history/task.go

@@ -417,20 +417,20 @@ func (c *Customer) EsConGetDataV1(stype string, dataSource int, esCon *esv.EsV1)
 	}
 }
 
-type MySource struct {
-	Querys string
-}
+// type MySource struct {
+// 	Querys string
+// }
 
-func (m *MySource) Source() (interface{}, error) {
-	mp := make(map[string]interface{})
-	json.Unmarshal([]byte(m.Querys), &mp)
-	return mp["query"], nil
-}
+// func (m *MySource) Source() (interface{}, error) {
+// 	mp := make(map[string]interface{})
+// 	json.Unmarshal([]byte(m.Querys), &mp)
+// 	return mp["query"], nil
+// }
 
 func (c *Customer) EsConGetDataV7(stype string, dataSource int, esCon *esv.EsV7) {
 	client := esCon.GetEsConn()
 	defer esCon.DestoryEsConn(client)
-	ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
+	ctx, _ := context.WithTimeout(context.Background(), 3*time.Hour)
 	for _, dm := range c.Departments {
 		for _, sr := range dm.Rules {
 			ch := make(chan bool, 10)

+ 48 - 22
src/util/utiltag.go

@@ -1,13 +1,14 @@
 package util
 
 import (
+	"context"
 	"encoding/json"
 	"errors"
+	elastic "es"
 	"fmt"
 	"log"
 	"models"
 	"qfw/util"
-	"qfw/util/elastic"
 	"qfw/util/redis"
 	"regexp"
 	sql "sqlmodel"
@@ -110,7 +111,7 @@ func UtilEsFind1(tags map[string]interface{}, dataType string) (error, string, i
 	if strings.Contains(esquery, "site") { //选择网站名称,使用全量数据索引
 		index = EsAllIndex
 	}
-	dataTypes := ""
+	dataTypes, esquery2 := "", ""
 	if dataType == "1" {
 		index = InterimIndex
 		dataTypes = "1"
@@ -119,15 +120,16 @@ func UtilEsFind1(tags map[string]interface{}, dataType string) (error, string, i
 		log.Println("esquery111111", esquery)
 		esquery, _ = FilterTimeSql(esquery, startTime, endTime)
 		log.Println("esquery222222", esquery)
+		esquery2 = esquery
 	} else {
 		index = TotalIndex
 		dataTypes = "2"
 	}
-	esquery = esquery[:len(esquery)-1] + `,"size":` + fmt.Sprintf("%d", i_maxnum) + `}`
+	// esquery = esquery[:len(esquery)-1] + `,"size":` + fmt.Sprintf("%d", i_maxnum) + `}`
 	log.Println("esquery", esquery)
-	err, counts := searchDataArr(index, esquery, sdataid, tags, maths)
+	err, counts := searchDataArr(index, esquery, esquery2, sdataid, i_maxnum, tags, maths)
 	if counts == 0 && err == nil && index == InterimIndex {
-		err, counts = searchDataArr(TotalIndex, esquery, sdataid, tags, maths)
+		err, counts = searchDataArr(TotalIndex, esquery, esquery2, sdataid, i_maxnum, tags, maths)
 		if counts > 0 {
 			dataTypes = "2"
 		}
@@ -135,7 +137,7 @@ func UtilEsFind1(tags map[string]interface{}, dataType string) (error, string, i
 	return err, dataTypes, counts
 }
 
-func searchDataArr(index, esquery, sdataid string, tags map[string]interface{}, maths []map[string]string) (error, int64) {
+func searchDataArr(index, esquery, esquery2, sdataid string, i_maxnum int64, tags map[string]interface{}, maths []map[string]string) (error, int64) {
 	var (
 		err    error
 		counts = int64(0)
@@ -143,12 +145,12 @@ func searchDataArr(index, esquery, sdataid string, tags map[string]interface{},
 	)
 	for {
 		listLen := int(redis.LLEN("datag", "jyqyfw_es_query"))
-		if listLen < 2 {
+		if listLen < 5 {
 			redis.RPUSH("datag", "jyqyfw_es_query", 1)
-			err, counts = searchData(index, esquery, sdataid, tags, maths)
+			err, counts = searchData(index, esquery, esquery2, sdataid, i_maxnum, tags, maths)
 			redis.LPOP("datag", "jyqyfw_es_query")
 			break
-		} else if listLen > 10 || times > 60 {
+		} else if listLen > 20 || times > 60 {
 			err = errors.New("系统繁忙,请稍后再试")
 			break
 		}
@@ -158,17 +160,32 @@ func searchDataArr(index, esquery, sdataid string, tags map[string]interface{},
 	return err, counts
 }
 
-func searchData(index, esquery, sdataid string, tags map[string]interface{}, maths []map[string]string) (error, int64) {
-	client := elastic.GetEsConn()
-	defer elastic.DestoryEsConn(client)
-	searchResult, err := client.Search(index).Source(esquery).Do()
+type MySource struct {
+	Querys string
+}
+
+func (m *MySource) Source() (interface{}, error) {
+	mp := make(map[string]interface{})
+	json.Unmarshal([]byte(m.Querys), &mp)
+	return mp["query"], nil
+}
+
+func searchData(index, esquery, esquery2, sdataid string, i_maxnum int64, tags map[string]interface{}, maths []map[string]string) (error, int64) {
+	esCon := elastic.VarEs.(*elastic.EsV7)
+	client := esCon.GetEsConn()
+	defer esCon.DestoryEsConn(client)
+	ctx, _ := context.WithTimeout(context.Background(), 60*time.Second)
+	cc := &MySource{
+		Querys: esquery,
+	}
+	searchResult, err := client.Search(index).Query(cc).Size(int(i_maxnum)).Do(ctx)
 	if err == nil && searchResult.Hits != nil {
 		datas := make([]map[string]interface{}, 0)
-		util.Debug("es查询到的数量:", searchResult.Hits.TotalHits)
+		util.Debug("es查询到的数量:", searchResult.Hits.TotalHits.Value)
 		util.Debug("es查询数据数量:", len(searchResult.Hits.Hits))
 		for _, v := range searchResult.Hits.Hits {
 			item := make(map[string]interface{})
-			if json.Unmarshal(*v.Source, &item) == nil {
+			if json.Unmarshal(v.Source, &item) == nil {
 				delete(item, "_id")
 				item["info_id"] = v.Id
 				item["s_dataid"] = sdataid
@@ -256,7 +273,7 @@ func searchData(index, esquery, sdataid string, tags map[string]interface{}, mat
 				datas = append(datas, item)
 			}
 		}
-		counts := searchResult.Hits.TotalHits
+		counts := Es.Count(index, Itype, esquery2)
 		Mgo.Update("cuserdepartrule", bson.M{"_id": tags["_id"]}, bson.M{
 			"$set": bson.M{
 				"i_estotal": counts,
@@ -351,8 +368,10 @@ func UtilEsFind2(tags map[string]interface{}, n int) (error, int64) {
 	} else {
 		return errors.New("o_rules no found"), 0
 	}
-	client := elastic.GetEsConn()
-	defer elastic.DestoryEsConn(client)
+	esCon := elastic.VarEs.(*elastic.EsV7)
+	client := esCon.GetEsConn()
+	defer esCon.DestoryEsConn(client)
+	ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
 
 	//for _, k := range  {
 	////todo  修改开始结束时间 重新生成es语句
@@ -424,14 +443,17 @@ func UtilEsFind2(tags map[string]interface{}, n int) (error, int64) {
 			index = EsAllIndex
 		}
 		log.Println("es索引-------------------- ", index)
-		searchResult, err := client.Search(index).Source(esSql).Do()
+		cc := &MySource{
+			Querys: esSql,
+		}
+		searchResult, err := client.Search(index).Query(cc).Do(ctx)
 		if err == nil && searchResult.Hits != nil {
 			util.Debug("es查询到的数量:", searchResult.Hits.TotalHits)
 			util.Debug("开始处理")
 
 			for _, v := range searchResult.Hits.Hits {
 				item := make(map[string]interface{})
-				if json.Unmarshal(*v.Source, &item) == nil {
+				if json.Unmarshal(v.Source, &item) == nil {
 					delete(item, "_id")
 					item["info_id"] = v.Id
 					item["s_dataid"] = sdataid
@@ -534,7 +556,7 @@ func UtilEsFind2(tags map[string]interface{}, n int) (error, int64) {
 			}
 			util.Debug("处理完成", len(datas))
 
-			count := searchResult.Hits.TotalHits
+			count := searchResult.Hits.TotalHits.Value
 			totalCount += count
 		} else {
 			util.Debug(err)
@@ -868,7 +890,11 @@ func Utiltags(tag map[string]interface{}) string {
 		QueryObjecct.Bool = &ffBoolObject
 	}
 	if len(fqBoolObject.Must) > 0 || len(fqBoolObject.MustNot) > 0 || len(fqBoolObject.Should) > 0 {
-		QueryObjecct.Bool.Must = append(QueryObjecct.Bool.Must, &fqBoolObject)
+		if QueryObjecct.Bool == nil {
+			QueryObjecct.Bool = &fqBoolObject
+		} else {
+			QueryObjecct.Bool.Must = append(QueryObjecct.Bool.Must, &fqBoolObject)
+		}
 		if len(torules.Should) > 0 {
 			QueryObjecct.Bool.Must = append(QueryObjecct.Bool.Must, map[string]interface{}{
 				"bool": torules,