Jianghan 2 vuotta sitten
vanhempi
commit
349beb2daf
5 muutettua tiedostoa jossa 441 lisäystä ja 19 poistoa
  1. 2 1
      src/config.json
  2. 1 1
      src/front/front.go
  3. 8 11
      src/front/server.go
  4. 7 6
      src/util/config.go
  5. 423 0
      src/util/elasticSim.go

+ 2 - 1
src/config.json

@@ -26,7 +26,8 @@
     "es":{
     	"addr": "http://192.168.3.206:9800",
 	    "index": "bidding",
-	    "itype": "bidding",
+        "user": "",
+        "password": "",
 	    "pool": 12
     },
     "fields":{

+ 1 - 1
src/front/front.go

@@ -1091,7 +1091,7 @@ func GetDataById1(coll string, ids []string, stype string, tmp map[string]map[st
 				(*bidData)["filetext"] = util.GetFileText(*bidData)
 				// es查询项目合并信息
 				esQ := `{"query":{"bool":{"must":[{"term":{"ids":"` + id + `"}}]}}}`
-				info := util.Es.Get("projectset", "projectset", esQ)
+				info := util.Es.Get("projectset", esQ)
 				if len(*info) == 1 {
 					ids := qu.ObjArrToStringArr((*info)[0]["ids"].([]interface{}))
 					if len(ids) > 0 {

+ 8 - 11
src/front/server.go

@@ -2,6 +2,7 @@
 package front
 
 import (
+	"context"
 	"encoding/json"
 	"io/ioutil"
 
@@ -20,7 +21,7 @@ import (
 	"go.mongodb.org/mongo-driver/bson"
 
 	"github.com/tealeg/xlsx"
-	es "gopkg.in/olivere/elastic.v1"
+	es "gopkg.in/olivere/elastic.v7"
 )
 
 type Front struct {
@@ -574,14 +575,10 @@ func (i *Front) ImportByEs() {
 	qu.Debug("查询总数:", escount)
 	if escount > 0 {
 		//查询条件类型转换
-		var q es.Query
-		tmpQuery := es.BoolQuery{
-			QueryStrings: estext,
-		}
-		q = tmpQuery
+		q := es.NewQueryStringQuery(estext)
 		//游标查询,index不支持别名,只能写索引库的名称
-		res, err := client.Scroll(util.Index).Query(q).Size(200).Do() //查询一条获取游标
-		ids := []string{}                                             //id数据
+		res, err := client.Scroll(util.Index).Query(q).Size(200).Do(context.TODO()) //查询一条获取游标
+		ids := []string{}                                                           //id数据
 		if err == nil {
 			numDocs := 0
 			scrollId := res.ScrollId
@@ -590,7 +587,7 @@ func (i *Front) ImportByEs() {
 					qu.Debug("ScrollId Is Error")
 					break
 				}
-				searchResult, err := client.Scroll(util.Index).Size(200).ScrollId(scrollId).Do() //查询
+				searchResult, err := client.Scroll(util.Index).Size(200).ScrollId(scrollId).Do(context.TODO()) //查询
 				if err != nil {
 					if err.Error() == "EOS" { //迭代完毕
 						qu.Debug("Es Search Data Over:", err)
@@ -609,7 +606,7 @@ func (i *Front) ImportByEs() {
 							wg.Done()
 						}()
 						tmp := make(map[string]interface{})
-						if json.Unmarshal(*tmpHit.Source, &tmp) == nil {
+						if json.Unmarshal(tmpHit.Source, &tmp) == nil {
 							id := qu.ObjToString(tmp["_id"])
 							tmp["id"] = id //记录数据原有id
 							lock.Lock()
@@ -625,7 +622,7 @@ func (i *Front) ImportByEs() {
 				scrollId = searchResult.ScrollId
 			}
 			wg.Wait()
-			client.ClearScroll().ScrollId(scrollId).Do() //清理游标
+			client.ClearScroll().ScrollId(scrollId).Do(context.TODO()) //清理游标
 			//qu.Debug("Result Data Count:", numDocs)
 		} else {
 			qu.Debug("Es Search Data Error")

+ 7 - 6
src/util/config.go

@@ -6,7 +6,6 @@ package util
 import (
 	mgo "mongodb"
 	qu "qfw/util"
-	"qfw/util/elastic"
 	"sort"
 )
 
@@ -42,8 +41,8 @@ var (
 	MgoE     *mgo.MongodbSim //extract
 	ExtColl1 string
 	ExtColl2 string
-	MgoM     *mgo.MongodbSim  //标注
-	Es       *elastic.Elastic //es
+	MgoM     *mgo.MongodbSim //标注
+	Es       *Elastic        //es
 	Index    string
 	Itype    string
 	//
@@ -92,9 +91,11 @@ func InitConfig() {
 	es := Config.Elas
 	Index = qu.ObjToString(es["index"])
 	Itype = qu.ObjToString(es["itype"])
-	Es = &elastic.Elastic{
-		S_esurl: qu.ObjToString(es["addr"]),
-		I_size:  qu.IntAll(es["pool"]),
+	Es = &Elastic{
+		S_esurl:  qu.ObjToString(es["addr"]),
+		I_size:   qu.IntAll(es["pool"]),
+		Username: qu.ObjToString(es["user"]),
+		Password: qu.ObjToString(es["password"]),
 	}
 	Es.InitElasticSize()
 

+ 423 - 0
src/util/elasticSim.go

@@ -0,0 +1,423 @@
+package util
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	es "gopkg.in/olivere/elastic.v7"
+	"log"
+	"qfw/util"
+	"runtime"
+	"strings"
+	"sync"
+	"time"
+)
+
+type Elastic struct {
+	S_esurl      string
+	I_size       int
+	Addrs        []string
+	Pool         chan *es.Client
+	lastTime     int64
+	lastTimeLock sync.Mutex
+	ntimeout     int
+	Username     string
+	Password     string
+}
+
+func (e *Elastic) InitElasticSize() {
+	e.Pool = make(chan *es.Client, e.I_size)
+	for _, s := range strings.Split(e.S_esurl, ",") {
+		e.Addrs = append(e.Addrs, s)
+	}
+	log.Println(e.Password, e.Username)
+	for i := 0; i < e.I_size; i++ {
+		client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
+		e.Pool <- client
+	}
+}
+
+//关闭连接
+func (e *Elastic) DestoryEsConn(client *es.Client) {
+	select {
+	case e.Pool <- client:
+		break
+	case <-time.After(time.Second * 1):
+		if client != nil {
+			client.Stop()
+		}
+		client = nil
+	}
+}
+
+func (e *Elastic) GetEsConn() *es.Client {
+	select {
+	case c := <-e.Pool:
+		if c == nil || !c.IsRunning() {
+			log.Println("new esclient.", len(e.Pool))
+			client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
+				es.SetSniff(false))
+			if err == nil && client.IsRunning() {
+				return client
+			}
+		}
+		return c
+	case <-time.After(time.Second * 4):
+		//超时
+		e.ntimeout++
+		e.lastTimeLock.Lock()
+		defer e.lastTimeLock.Unlock()
+		//12秒后允许创建链接
+		c := time.Now().Unix() - e.lastTime
+		if c > 12 {
+			e.lastTime = time.Now().Unix()
+			log.Println("add client..", len(e.Pool))
+			c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+			go func() {
+				for i := 0; i < 2; i++ {
+					client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+					e.Pool <- client
+				}
+			}()
+			return c
+		}
+		return nil
+	}
+}
+
+func (e *Elastic) Get(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer func() {
+		go e.DestoryEsConn(client)
+	}()
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			if resNum < 5000 {
+				res = make([]map[string]interface{}, resNum)
+				for i, hit := range searchResult.Hits.Hits {
+					parseErr := json.Unmarshal(hit.Source, &res[i])
+					if parseErr == nil && hit.Highlight != nil && res[i] != nil {
+						res[i]["highlight"] = map[string][]string(hit.Highlight)
+					}
+				}
+			} else {
+				log.Println("查询结果太多,查询到:", resNum, "条")
+			}
+		}
+	}
+	return &res
+}
+
+//关闭elastic
+func (e *Elastic) Close() {
+	for i := 0; i < e.I_size; i++ {
+		cli := <-e.Pool
+		cli.Stop()
+		cli = nil
+	}
+	e.Pool = nil
+	e = nil
+}
+
+//获取连接
+//func (e *Elastic) GetEsConn() (c *es.Client) {
+//	defer util.Catch()
+//	select {
+//	case c = <-e.Pool:
+//		if c == nil || !c.IsRunning() {
+//			client, err := es.NewClient(es.SetURL(addrs...),
+//				es.SetMaxRetries(2), es.SetSniff(false))
+//			if err == nil && client.IsRunning() {
+//				return client
+//			}
+//			return nil
+//		}
+//		return
+//	case <-time.After(time.Second * 7):
+//		//超时
+//		ntimeout++
+//		log.Println("timeout times:", ntimeout)
+//		return nil
+//	}
+//}
+
+func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		req := client.Bulk()
+		for _, v := range obj {
+			//if isDelBefore {
+			//	req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
+			//}
+			id := util.ObjToString(v["_id"])
+			delete(v, "_id")
+			req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
+		}
+		_, err := req.Do(context.Background())
+		if err != nil {
+			log.Println("批量保存到ES出错", err.Error())
+		}
+	}
+}
+
+//根据id删除索引对象
+func (e *Elastic) DelById(index, itype, id string) bool {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	b := false
+	if client != nil {
+		var err error
+		_, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.Background())
+		if err != nil {
+			log.Println("更新检索出错:", err.Error())
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			util.Debug(resNum)
+			res = make([]map[string]interface{}, resNum)
+			for i, hit := range searchResult.Hits.Hits {
+				json.Unmarshal(hit.Source, &res[i])
+			}
+		}
+	}
+	return &res
+}
+
+//func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		query := `{"query":{"term":{"_id":"` + id + `"}}`
+//		if len(fields) > 0 {
+//			query = query + `,"_source":[` + fields + `]`
+//		}
+//		query = query + "}"
+//		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
+//		if err != nil {
+//			log.Println("从ES查询出错", err.Error())
+//			return nil
+//		}
+//		var res map[string]interface{}
+//		if searchResult.Hits != nil {
+//			resNum := len(searchResult.Hits.Hits)
+//			if resNum == 1 {
+//				res = make(map[string]interface{})
+//				for _, hit := range searchResult.Hits.Hits {
+//					json.Unmarshal(*hit.Source., &res)
+//				}
+//				return &res
+//			}
+//		}
+//	}
+//	return nil
+//}
+
+func (e *Elastic) Count(index, itype string, query interface{}) int64 {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var qq es.Query
+		if qi, ok2 := query.(es.Query); ok2 {
+			qq = qi
+		}
+		n, err := client.Count(index).Query(qq).Do(context.Background())
+		if err != nil {
+			log.Println("统计出错", err.Error())
+		}
+
+		return n
+	}
+	return 0
+}
+
+//更新一个字段
+//func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, data := range update {
+//			id := data["id"]
+//			updateStr := data["updateStr"]
+//			if id != "" && updateStr != "" {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			} else {
+//				log.Println("数据错误")
+//			}
+//		}
+//	}
+//}
+
+//更新多个字段
+//func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, arr := range arrs {
+//			id := arr[0]["id"].(string)
+//			update := arr[1]["update"].([]string)
+//			for _, str := range update {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			}
+//		}
+//	}
+//}
+
+// UpdateBulk 批量修改文档
+func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type(itype)
+	for _, d := range docs {
+		id := d[0]["_id"].(string)
+		doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
+		bulkService.Add(doc)
+	}
+	_, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("UpdateBulk all success err is %v\n", err)
+	}
+	//if len(res.Failed()) > 0 {
+	//	fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
+	//}
+}
+
+// UpsertBulk 批量修改文档(不存在则插入)
+func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
+		bulkService.Add(doc)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		return err
+	}
+	if len(res.Failed()) > 0 {
+		return errors.New(res.Failed()[0].Error.Reason)
+	}
+	return nil
+}
+
+// 批量删除
+func (e *Elastic) DeleteBulk(index string, ids []string) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		req := es.NewBulkDeleteRequest().Id(ids[i])
+		bulkService.Add(req)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
+	}
+}