package elastic import ( "context" "encoding/json" "errors" "fmt" es "github.com/olivere/elastic/v7" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "runtime" "strings" "sync" "time" ) type Elastic struct { S_esurl string I_size int Addrs []string Pool chan *es.Client lastTime int64 lastTimeLock sync.Mutex ntimeout int Username string Password string } func (e *Elastic) InitElasticSize() { e.Pool = make(chan *es.Client, e.I_size) for _, s := range strings.Split(e.S_esurl, ",") { e.Addrs = append(e.Addrs, s) } for i := 0; i < e.I_size; i++ { client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false)) e.Pool <- client } } // 关闭连接 func (e *Elastic) DestoryEsConn(client *es.Client) { select { case e.Pool <- client: break case <-time.After(time.Second * 1): if client != nil { client.Stop() } client = nil } } func (e *Elastic) GetEsConn() *es.Client { select { case c := <-e.Pool: if c == nil || !c.IsRunning() { log.Println("new esclient.", len(e.Pool)) client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false)) if err == nil && client.IsRunning() { return client } } return c case <-time.After(time.Second * 4): //超时 e.ntimeout++ e.lastTimeLock.Lock() defer e.lastTimeLock.Unlock() //12秒后允许创建链接 c := time.Now().Unix() - e.lastTime if c > 12 { e.lastTime = time.Now().Unix() log.Println("add client..", len(e.Pool)) c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false)) go func() { for i := 0; i < 2; i++ { client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false)) e.Pool <- client } }() return c } return nil } } func (e *Elastic) Get(index string, query es.Query) *[]map[string]interface{} { client := e.GetEsConn() defer func() { go e.DestoryEsConn(client) }() var res []map[string]interface{} if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Query(query).Do(context.Background()) if err != nil { log.Println("从ES查询出错", err.Error()) return nil } if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) if resNum < 5000 { res = make([]map[string]interface{}, resNum) for i, hit := range searchResult.Hits.Hits { parseErr := json.Unmarshal(hit.Source, &res[i]) if parseErr == nil && hit.Highlight != nil && res[i] != nil { res[i]["highlight"] = map[string][]string(hit.Highlight) } } } else { log.Println("查询结果太多,查询到:", resNum, "条") } } } return &res } // 关闭elastic func (e *Elastic) Close() { for i := 0; i < e.I_size; i++ { cli := <-e.Pool cli.Stop() cli = nil } e.Pool = nil e = nil } //获取连接 //func (e *Elastic) GetEsConn() (c *es.Client) { // defer util.Catch() // select { // case c = <-e.Pool: // if c == nil || !c.IsRunning() { // client, err := es.NewClient(es.SetURL(addrs...), // es.SetMaxRetries(2), es.SetSniff(false)) // if err == nil && client.IsRunning() { // return client // } // return nil // } // return // case <-time.After(time.Second * 7): // //超时 // ntimeout++ // log.Println("timeout times:", ntimeout) // return nil // } //} func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) { client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { req := client.Bulk() for _, v := range obj { //if isDelBefore { // req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"]))) //} id := util.ObjToString(v["_id"]) doc := make(map[string]interface{}, 0) for k, va := range v { doc[k] = va } delete(doc, "_id") req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(doc)) } _, err := req.Do(context.Background()) if err != nil { log.Println("批量保存到ES出错", err.Error()) } } } // 根据id删除索引对象 func (e *Elastic) DelById(index, id string) bool { client := e.GetEsConn() defer e.DestoryEsConn(client) b := false if client != nil { var err error _, err = client.Delete().Index(index).Id(id).Do(context.Background()) if err != nil { log.Println("更新检索出错:", err.Error()) } else { b = true } } return b } func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} { client := e.GetEsConn() defer e.DestoryEsConn(client) var res []map[string]interface{} if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Source(query).Do(context.Background()) if err != nil { log.Println("从ES查询出错", err.Error()) return nil } if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) res = make([]map[string]interface{}, resNum) for i, hit := range searchResult.Hits.Hits { json.Unmarshal(hit.Source, &res[i]) } } } return &res } //func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} { // client := e.GetEsConn() // defer e.DestoryEsConn(client) // if client != nil { // defer func() { // if r := recover(); r != nil { // log.Println("[E]", r) // for skip := 1; ; skip++ { // _, file, line, ok := runtime.Caller(skip) // if !ok { // break // } // go log.Printf("%v,%v\n", file, line) // } // } // }() // query := `{"query":{"term":{"_id":"` + id + `"}}` // if len(fields) > 0 { // query = query + `,"_source":[` + fields + `]` // } // query = query + "}" // searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() // if err != nil { // log.Println("从ES查询出错", err.Error()) // return nil // } // var res map[string]interface{} // if searchResult.Hits != nil { // resNum := len(searchResult.Hits.Hits) // if resNum == 1 { // res = make(map[string]interface{}) // for _, hit := range searchResult.Hits.Hits { // json.Unmarshal(*hit.Source., &res) // } // return &res // } // } // } // return nil //} func (e *Elastic) Count(index string, query interface{}) int64 { client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() var qq es.Query if qi, ok2 := query.(es.Query); ok2 { qq = qi } n, err := client.Count(index).Query(qq).Do(context.Background()) if err != nil { log.Println("统计出错", err.Error()) } return n } return 0 } //更新一个字段 //func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) { // client := e.GetEsConn() // defer e.DestoryEsConn(client) // if client != nil { // defer func() { // if r := recover(); r != nil { // log.Println("[E]", r) // for skip := 1; ; skip++ { // _, file, line, ok := runtime.Caller(skip) // if !ok { // break // } // go log.Printf("%v,%v\n", file, line) // } // } // }() // for _, data := range update { // id := data["id"] // updateStr := data["updateStr"] // if id != "" && updateStr != "" { // _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do() // if err != nil { // log.Println("更新检索出错:", err.Error()) // } // } else { // log.Println("数据错误") // } // } // } //} //更新多个字段 //func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) { // client := e.GetEsConn() // defer e.DestoryEsConn(client) // if client != nil { // defer func() { // if r := recover(); r != nil { // log.Println("[E]", r) // for skip := 1; ; skip++ { // _, file, line, ok := runtime.Caller(skip) // if !ok { // break // } // go log.Printf("%v,%v\n", file, line) // } // } // }() // for _, arr := range arrs { // id := arr[0]["id"].(string) // update := arr[1]["update"].([]string) // for _, str := range update { // _, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do() // if err != nil { // log.Println("更新检索出错:", err.Error()) // } // } // } // } //} // UpdateBulk 批量修改文档 func (e *Elastic) UpdateBulk(index string, docs ...[]map[string]interface{}) { client := e.GetEsConn() defer e.DestoryEsConn(client) bulkService := client.Bulk().Index(index).Refresh("true") //bulkService.Type(itype) for _, d := range docs { id := d[0]["_id"].(string) doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1]) bulkService.Add(doc) } _, err := bulkService.Do(context.Background()) if err != nil { fmt.Printf("UpdateBulk all success err is %v\n", err) } //if len(res.Failed()) > 0 { // fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0])) //} } // UpsertBulk 批量修改文档(不存在则插入) func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error { client := e.GetEsConn() defer e.DestoryEsConn(client) bulkService := client.Bulk().Index(index).Refresh("true") //bulkService.Type("bidding") for i := range ids { doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i]) bulkService.Add(doc) } res, err := bulkService.Do(context.Background()) if err != nil { return err } if len(res.Failed()) > 0 { return errors.New(res.Failed()[0].Error.Reason) } return nil } // 批量删除 func (e *Elastic) DeleteBulk(index string, ids []string) { client := e.GetEsConn() defer e.DestoryEsConn(client) bulkService := client.Bulk().Index(index).Refresh("true") //bulkService.Type("bidding") for i := range ids { req := es.NewBulkDeleteRequest().Id(ids[i]) bulkService.Add(req) } res, err := bulkService.Do(context.Background()) if err != nil { fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded())) } } // InsertOrUpdate 插入或更新 func (e *Elastic) InsertOrUpdate(index string, docs []map[string]interface{}) error { client := e.GetEsConn() defer e.DestoryEsConn(client) for _, item := range docs { // 获取唯一标识符 id := item["id"].(string) if id == "" { id = item["_id"].(string) } // 根据id判断记录是否存在 exists, err := client.Exists(). Index(index). Id(id). Do(context.Background()) if err != nil { return err } // 存在则更新,不存在则插入 if exists { _, err := client.Update(). Index(index). Id(id). Doc(item). Do(context.Background()) if err != nil { return err } } else { _, err := client.Index(). Index(index). Id(id). BodyJson(item). Do(context.Background()) if err != nil { return err } } } return nil } // ExistsIndex 判断索引是否存在 func (e *Elastic) ExistsIndex(index string) (exists bool, err error) { client := e.GetEsConn() defer e.DestoryEsConn(client) exists, err = client.IndexExists(index).Do(context.Background()) return } // CreateIndex 创建索引 func (e *Elastic) CreateIndex(index string, mapping string) (err error) { client := e.GetEsConn() defer e.DestoryEsConn(client) _, err = client.CreateIndex(index).BodyString(mapping).Do(context.Background()) return } // DeleteIndex 删除索引 func (e *Elastic) DeleteIndex(index string) (err error) { client := e.GetEsConn() defer e.DestoryEsConn(client) _, err = client.DeleteIndex(index).Do(context.Background()) return } // RemoveAlias 移除别名 func (e *Elastic) RemoveAlias(index, aliasName string) (err error) { client := e.GetEsConn() defer e.DestoryEsConn(client) _, err = client.Alias().Remove(index, aliasName).Do(context.Background()) return } // SetAlias 添加别名 func (e *Elastic) SetAlias(index, aliasName string) (err error) { client := e.GetEsConn() defer e.DestoryEsConn(client) _, err = client.Alias().Add(index, aliasName).Do(context.Background()) return } // DeleteByID 根据ID 删除索引数据;id 或者索引名称不存在,都不会报错 func (e *Elastic) DeleteByID(index, id string) error { client := e.GetEsConn() defer e.DestoryEsConn(client) _, err := client.Delete().Index(index).Id(id).Do(context.Background()) if err != nil && es.IsNotFound(err) { return nil } return err } // UpdateDocument 更新指定ID的文档 func (e *Elastic) UpdateDocument(indexName string, documentID string, updateData map[string]interface{}) error { client := e.GetEsConn() defer e.DestoryEsConn(client) updateResult, err := client.Update(). Index(indexName). Id(documentID). Doc(updateData). Do(context.Background()) if err != nil { return err } if updateResult.Result != "updated" { return fmt.Errorf("Document not updated: %v", updateResult.Result) } return nil } // Save 保存对象 func (e *Elastic) Save(index string, obj interface{}) bool { client := e.GetEsConn() defer e.DestoryEsConn(client) defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() data := util.ObjToMap(obj) _id := mongodb.BsonIdToSId((*data)["_id"]) (*data)["id"] = _id delete((*data), "_id") _, err := client.Index().Index(index).Id(_id).BodyJson(data).Do(context.TODO()) if err != nil { log.Println("保存到ES出错", err.Error(), obj) return false } else { return true } } // SaveDocument 保存单个索引 func (e *Elastic) SaveDocument(index string, data map[string]interface{}) error { client := e.GetEsConn() defer e.DestoryEsConn(client) // 从数据中获取 ID 字段 id, ok := data["id"].(string) if !ok { return errors.New("ID field not found in data") } // 将数据转换为 JSON 格式 docJSON, err := json.Marshal(data) if err != nil { return err } // 创建一个新的索引请求 indexRequest := client.Index(). Index(index). Id(id). BodyJson(string(docJSON)) // 执行索引请求 _, err = indexRequest.Do(context.Background()) if err != nil { return err } return nil } // GetById 获取单个索引文档 func (e *Elastic) GetById(index string, id string) (err error, data map[string]interface{}) { client := e.GetEsConn() defer e.DestoryEsConn(client) ctx := context.Background() // 使用Get方法获取文档 getResponse, err := client.Get(). Index(index). Id(id). Do(ctx) if err != nil { // 处理错误 return err, nil } if !getResponse.Found { // 文档存在,返回文档数据 return errors.New("Document not found"), nil // 文档不存在,返回错误 } if err := json.Unmarshal(getResponse.Source, &data); err != nil { // 处理解码错误 return err, nil } return nil, data }