Jianghan пре 2 година
родитељ
комит
8ce3078de2
1 измењених фајлова са 161 додато и 142 уклоњено
  1. 161 142
      elastic/elasticutil.go

+ 161 - 142
elastic/elasticutil.go

@@ -1,6 +1,8 @@
 package elastic
 
 import (
+	"bytes"
+	"context"
 	"encoding/json"
 	"fmt"
 	es "github.com/olivere/elastic/v7"
@@ -151,7 +153,7 @@ func Get(index, itype, query string) *[]map[string]interface{} {
 				}
 			}
 		}()
-		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
 		if err != nil {
 			log.Println("从ES查询出错", err.Error())
 			return nil
@@ -162,10 +164,11 @@ func Get(index, itype, query string) *[]map[string]interface{} {
 			if resNum < 5000 {
 				res = make([]map[string]interface{}, resNum)
 				for i, hit := range searchResult.Hits.Hits {
-					//d := json.NewDecoder(bytes.NewBuffer(*hit.Source))
-					//d.UseNumber()
-					//d.Decode(&res[i])
-					parseErr := json.Unmarshal(*hit.Source, &res[i])
+					b, _ := hit.Source.MarshalJSON()
+					d := json.NewDecoder(bytes.NewBuffer(b))
+					d.UseNumber()
+					parseErr := d.Decode(&res[i])
+					//parseErr := json.Unmarshal(*hit.Source, &res[i])
 					if parseErr == nil && hit.Highlight != nil && res[i] != nil {
 						res[i]["highlight"] = map[string][]string(hit.Highlight)
 					}
@@ -199,7 +202,7 @@ func GetOA(index, itype, query string) (*[]map[string]interface{}, int) {
 				}
 			}
 		}()
-		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
 		if err != nil {
 			log.Println("从ES查询出错", err.Error())
 			return nil, 0
@@ -210,10 +213,11 @@ func GetOA(index, itype, query string) (*[]map[string]interface{}, int) {
 			if resNum < 5000 {
 				res = make([]map[string]interface{}, resNum)
 				for i, hit := range searchResult.Hits.Hits {
-					//d := json.NewDecoder(bytes.NewBuffer(*hit.Source))
-					//d.UseNumber()
-					//d.Decode(&res[i])
-					parseErr := json.Unmarshal(*hit.Source, &res[i])
+					b, _ := hit.Source.MarshalJSON()
+					d := json.NewDecoder(bytes.NewBuffer(b))
+					d.UseNumber()
+					parseErr := d.Decode(&res[i])
+					//parseErr := json.Unmarshal(*hit.Source, &res[i])
 					if parseErr == nil && hit.Highlight != nil && res[i] != nil {
 						res[i]["highlight"] = map[string][]string(hit.Highlight)
 					}
@@ -245,7 +249,7 @@ func GetNoLimit(index, itype, query string) *[]map[string]interface{} {
 				}
 			}
 		}()
-		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
 		if err != nil {
 			log.Println("从ES查询出错", err.Error())
 			return nil
@@ -255,7 +259,10 @@ func GetNoLimit(index, itype, query string) *[]map[string]interface{} {
 			resNum := len(searchResult.Hits.Hits)
 			res = make([]map[string]interface{}, resNum)
 			for i, hit := range searchResult.Hits.Hits {
-				json.Unmarshal(*hit.Source, &res[i])
+				b, _ := hit.Source.MarshalJSON()
+				d := json.NewDecoder(bytes.NewBuffer(b))
+				d.Decode(&res[i])
+				//json.Unmarshal(*hit.Source, &res[i])
 			}
 		}
 	}
@@ -401,7 +408,7 @@ func GetByIdField(index, itype, id, fields string) *map[string]interface{} {
 			query = query + `,"_source":[` + fields + `]`
 		}
 		query = query + "}"
-		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
 		if err != nil {
 			log.Println("从ES查询出错", err.Error())
 			return nil
@@ -412,7 +419,11 @@ func GetByIdField(index, itype, id, fields string) *map[string]interface{} {
 			if resNum == 1 {
 				res = make(map[string]interface{})
 				for _, hit := range searchResult.Hits.Hits {
-					json.Unmarshal(*hit.Source, &res)
+					b, _ := hit.Source.MarshalJSON()
+					d := json.NewDecoder(bytes.NewBuffer(b))
+					d.UseNumber()
+					d.Decode(&res)
+					//json.Unmarshal(*hit.Source, &res)
 				}
 				return &res
 			}
@@ -422,45 +433,48 @@ func GetByIdField(index, itype, id, fields string) *map[string]interface{} {
 }
 
 //根据id来查询文档
-func GetById(index, itype string, ids ...string) *[]map[string]interface{} {
-	client := GetEsConn()
-	defer DestoryEsConn(client)
-	var res []map[string]interface{}
-	if client != nil {
-		defer func() {
-			if r := recover(); r != nil {
-				log.Println("[E]", r)
-				for skip := 1; ; skip++ {
-					_, file, line, ok := runtime.Caller(skip)
-					if !ok {
-						break
-					}
-					go log.Printf("%v,%v\n", file, line)
-				}
-			}
-		}()
-		query := es.NewIdsQuery().Ids(ids...)
-		searchResult, err := client.Search().Index(index).Type(itype).Query(&query).Do()
-		if err != nil {
-			log.Println("从ES查询出错", err.Error())
-			return nil
-		}
-
-		if searchResult.Hits != nil {
-			resNum := len(searchResult.Hits.Hits)
-			if resNum < 5000 {
-				res = make([]map[string]interface{}, resNum)
-				for i, hit := range searchResult.Hits.Hits {
-					json.Unmarshal(*hit.Source, &res[i])
-				}
-			} else {
-				log.Println("查询结果太多,查询到:", resNum, "条")
-			}
-
-		}
-	}
-	return &res
-}
+//func GetById(index, itype string, ids ...string) *[]map[string]interface{} {
+//	client := GetEsConn()
+//	defer DestoryEsConn(client)
+//	var res []map[string]interface{}
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		query := es.NewIdsQuery().Ids(ids...)
+//		searchResult, err := client.Search().Index(index).Query(&query).Do(context.Background())
+//		if err != nil {
+//			log.Println("从ES查询出错", err.Error())
+//			return nil
+//		}
+//
+//		if searchResult.Hits != nil {
+//			resNum := len(searchResult.Hits.Hits)
+//			if resNum < 5000 {
+//				res = make([]map[string]interface{}, resNum)
+//				for i, hit := range searchResult.Hits.Hits {
+//					b, _ := hit.Source.MarshalJSON()
+//					d := json.NewDecoder(bytes.NewBuffer(b))
+//					d.Decode(&res[i])
+//					//json.Unmarshal(*hit.Source, &res[i])
+//				}
+//			} else {
+//				log.Println("查询结果太多,查询到:", resNum, "条")
+//			}
+//
+//		}
+//	}
+//	return &res
+//}
 
 //删除某个索引,根据查询
 func Del(index, itype string, query interface{}) bool {
@@ -482,7 +496,7 @@ func Del(index, itype string, query interface{}) bool {
 		}()
 		var err error
 		if qi, ok2 := query.(es.Query); ok2 {
-			_, err = client.DeleteByQuery().Index(index).Type(itype).Query(qi).Do()
+			_, err = client.DeleteByQuery().Index(index).Query(qi).Do(context.Background())
 		}
 		if err != nil {
 			log.Println("删除索引出错:", err.Error())
@@ -494,90 +508,90 @@ func Del(index, itype string, query interface{}) bool {
 }
 
 //根据语句更新对象
-func Update(index, itype, id string, updateStr string) bool {
-	client := GetEsConn()
-	defer DestoryEsConn(client)
-	b := false
-	if client != nil {
-		defer func() {
-			if r := recover(); r != nil {
-				log.Println("[E]", r)
-				for skip := 1; ; skip++ {
-					_, file, line, ok := runtime.Caller(skip)
-					if !ok {
-						break
-					}
-					go log.Printf("%v,%v\n", file, line)
-				}
-			}
-		}()
-		var err error
-		_, err = client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
-		if err != nil {
-			log.Println("更新检索出错:", err.Error())
-		} else {
-			b = true
-		}
-	}
-	return b
-}
-
-func BulkUpdate(index, itype string, ids []string, updateStr string) {
-	client := GetEsConn()
-	defer DestoryEsConn(client)
-	if client != nil {
-		defer func() {
-			if r := recover(); r != nil {
-				log.Println("[E]", r)
-				for skip := 1; ; skip++ {
-					_, file, line, ok := runtime.Caller(skip)
-					if !ok {
-						break
-					}
-					go log.Printf("%v,%v\n", file, line)
-				}
-			}
-		}()
-		for _, id := range ids {
-			_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
-			if err != nil {
-				log.Println("更新检索出错:", err.Error())
-			}
-		}
-	}
-}
-
-func BulkUpdateArr(index, itype string, update []map[string]string) {
-	client := GetEsConn()
-	defer DestoryEsConn(client)
-	if client != nil {
-		defer func() {
-			if r := recover(); r != nil {
-				log.Println("[E]", r)
-				for skip := 1; ; skip++ {
-					_, file, line, ok := runtime.Caller(skip)
-					if !ok {
-						break
-					}
-					go log.Printf("%v,%v\n", file, line)
-				}
-			}
-		}()
-		for _, data := range update {
-			id := data["id"]
-			updateStr := data["updateStr"]
-			if id != "" && updateStr != "" {
-				_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
-				if err != nil {
-					log.Println("更新检索出错:", err.Error())
-				}
-			} else {
-				log.Println("数据错误")
-			}
-		}
-
-	}
-}
+//func Update(index, itype, id string, updateStr string) bool {
+//	client := GetEsConn()
+//	defer DestoryEsConn(client)
+//	b := false
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		var err error
+//		_, err = client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
+//		if err != nil {
+//			log.Println("更新检索出错:", err.Error())
+//		} else {
+//			b = true
+//		}
+//	}
+//	return b
+//}
+//
+//func BulkUpdate(index, itype string, ids []string, updateStr string) {
+//	client := GetEsConn()
+//	defer DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, id := range ids {
+//			_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
+//			if err != nil {
+//				log.Println("更新检索出错:", err.Error())
+//			}
+//		}
+//	}
+//}
+//
+//func BulkUpdateArr(index, itype string, update []map[string]string) {
+//	client := GetEsConn()
+//	defer DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, data := range update {
+//			id := data["id"]
+//			updateStr := data["updateStr"]
+//			if id != "" && updateStr != "" {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			} else {
+//				log.Println("数据错误")
+//			}
+//		}
+//
+//	}
+//}
 
 //根据id删除索引对象
 func DelById(index, itype, id string) bool {
@@ -598,7 +612,7 @@ func DelById(index, itype, id string) bool {
 			}
 		}()
 		var err error
-		_, err = client.Delete().Index(index).Type(itype).Id(id).Do()
+		_, err = client.Delete().Index(index).Id(id).Do(context.Background())
 		if err != nil {
 			log.Println("更新检索出错:", err.Error())
 		} else {
@@ -789,7 +803,7 @@ func BulkSave(index, itype string, obj *[]map[string]interface{}, isDelBefore bo
 			}
 			req = req.Add(es.NewBulkIndexRequest().Index(index).Type(itype).Doc(v))
 		}
-		_, err := req.Do()
+		_, err := req.Do(context.Background())
 		if err != nil {
 			log.Println("批量保存到ES出错", err.Error())
 		}
@@ -816,7 +830,7 @@ func Count(index, itype string, query interface{}) int64 {
 		if qi, ok2 := query.(es.Query); ok2 {
 			qq = qi
 		}
-		n, err := client.Count(index).Type(itype).Query(qq).Do()
+		n, err := client.Count(index).Query(qq).Do(context.Background())
 		if err != nil {
 			log.Println("统计出错", err.Error())
 		}
@@ -1094,7 +1108,12 @@ func AnalyzerWord(index, word string) (result []string) {
 	p := url.Values{}
 	p["text"] = []string{word}
 	p["analyzer"] = []string{"ik"}
-	by, err := client.PerformRequest("GET", "/"+index+"/_analyze", p, nil)
+	opt := es.PerformRequestOptions{
+		Method: "GET",
+		Path:   "/" + index + "/_analyze",
+		Body:   p,
+	}
+	by, err := client.PerformRequest(context.Background(), opt)
 	if err != nil {
 		log.Println("AnalyzerWord Error:", err)
 		return