package elastic import ( "context" "errors" "fmt" es "github.com/olivere/elastic/v7" "log" "runtime" "strings" "sync" "time" ) type Elastic struct { S_esurl string I_size int Addrs []string Pool chan *es.Client lastTime int64 lastTimeLock sync.Mutex ntimeout int } func (e *Elastic) InitElasticSize() { e.Pool = make(chan *es.Client, e.I_size) for _, s := range strings.Split(e.S_esurl, ",") { e.Addrs = append(e.Addrs, s) } for i := 0; i < e.I_size; i++ { client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false)) e.Pool <- client } } //关闭连接 func (e *Elastic) DestoryEsConn(client *es.Client) { select { case e.Pool <- client: break case <-time.After(time.Second * 1): if client != nil { client.Stop() } client = nil } } func (e *Elastic) GetEsConn() *es.Client { select { case c := <-e.Pool: if c == nil || !c.IsRunning() { log.Println("new esclient.", len(e.Pool)) client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false)) if err == nil && client.IsRunning() { return client } } return c case <-time.After(time.Second * 4): //超时 e.ntimeout++ e.lastTimeLock.Lock() defer e.lastTimeLock.Unlock() //12秒后允许创建链接 c := time.Now().Unix() - e.lastTime if c > 12 { e.lastTime = time.Now().Unix() log.Println("add client..", len(e.Pool)) c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false)) go func() { for i := 0; i < 2; i++ { client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false)) e.Pool <- client } }() return c } return nil } } // //func (e *Elastic) Get(index, itype, query string) *[]map[string]interface{} { // client := e.GetEsConn() // defer func() { // go e.DestoryEsConn(client) // }() // var res []map[string]interface{} // if client != nil { // defer func() { // if r := recover(); r != nil { // log.Println("[E]", r) // for skip := 1; ; skip++ { // _, file, line, ok := runtime.Caller(skip) // if !ok { // break // } // go log.Printf("%v,%v\n", file, line) // } // } // }() // searchResult, err := client.Search().Index(index).Source(query).Do(context.Background()) // if err != nil { // log.Println("从ES查询出错", err.Error()) // return nil // } // if searchResult.Hits != nil { // resNum := len(searchResult.Hits.Hits) // if resNum < 5000 { // res = make([]map[string]interface{}, resNum) // for i, hit := range searchResult.Hits.Hits { // parseErr := json.Unmarshal(*hit.Source, &res[i]) // if parseErr == nil && hit.Highlight != nil && res[i] != nil { // res[i]["highlight"] = map[string][]string(hit.Highlight) // } // } // } else { // log.Println("查询结果太多,查询到:", resNum, "条") // } // } // } // return &res //} //关闭elastic func (e *Elastic) Close() { for i := 0; i < e.I_size; i++ { cli := <-e.Pool cli.Stop() cli = nil } e.Pool = nil e = nil } //获取连接 //func (e *Elastic) GetEsConn() (c *es.Client) { // defer util.Catch() // select { // case c = <-e.Pool: // if c == nil || !c.IsRunning() { // client, err := es.NewClient(es.SetURL(addrs...), // es.SetMaxRetries(2), es.SetSniff(false)) // if err == nil && client.IsRunning() { // return client // } // return nil // } // return // case <-time.After(time.Second * 7): // //超时 // ntimeout++ // log.Println("timeout times:", ntimeout) // return nil // } //} func (e *Elastic) BulkSave(index, itype string, obj *[]map[string]interface{}, isDelBefore bool) { client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { req := client.Bulk() for _, v := range *obj { if isDelBefore { req = req.Add(es.NewBulkDeleteRequest().Index(index).Type(itype).Id(fmt.Sprintf("%v", v["_id"]))) } req = req.Add(es.NewBulkIndexRequest().Index(index).Type(itype).Doc(v)) } _, err := req.Do(context.Background()) if err != nil { log.Println("批量保存到ES出错", err.Error()) } } } //根据id删除索引对象 func (e *Elastic) DelById(index, itype, id string) bool { client := e.GetEsConn() defer e.DestoryEsConn(client) b := false if client != nil { var err error _, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.Background()) if err != nil { log.Println("更新检索出错:", err.Error()) } else { b = true } } return b } //func (e *Elastic) GetNoLimit(index, itype, query string) *[]map[string]interface{} { // //log.Println("query -- ", query) // client := e.GetEsConn() // defer e.DestoryEsConn(client) // var res []map[string]interface{} // if client != nil { // defer func() { // if r := recover(); r != nil { // log.Println("[E]", r) // for skip := 1; ; skip++ { // _, file, line, ok := runtime.Caller(skip) // if !ok { // break // } // go log.Printf("%v,%v\n", file, line) // } // } // }() // searchResult, err := client.Search().Index(index).Source(query).Do(context.Background()) // if err != nil { // log.Println("从ES查询出错", err.Error()) // return nil // } // // if searchResult.Hits != nil { // resNum := len(searchResult.Hits.Hits) // res = make([]map[string]interface{}, resNum) // for i, hit := range searchResult.Hits.Hits { // json.Unmarshal(*hit.Source, &res[i]) // } // } // } // return &res //} //func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} { // client := e.GetEsConn() // defer e.DestoryEsConn(client) // if client != nil { // defer func() { // if r := recover(); r != nil { // log.Println("[E]", r) // for skip := 1; ; skip++ { // _, file, line, ok := runtime.Caller(skip) // if !ok { // break // } // go log.Printf("%v,%v\n", file, line) // } // } // }() // query := `{"query":{"term":{"_id":"` + id + `"}}` // if len(fields) > 0 { // query = query + `,"_source":[` + fields + `]` // } // query = query + "}" // searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() // if err != nil { // log.Println("从ES查询出错", err.Error()) // return nil // } // var res map[string]interface{} // if searchResult.Hits != nil { // resNum := len(searchResult.Hits.Hits) // if resNum == 1 { // res = make(map[string]interface{}) // for _, hit := range searchResult.Hits.Hits { // json.Unmarshal(*hit.Source., &res) // } // return &res // } // } // } // return nil //} func (e *Elastic) Count(index, itype string, query interface{}) int64 { client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() var qq es.Query if qi, ok2 := query.(es.Query); ok2 { qq = qi } n, err := client.Count(index).Query(qq).Do(context.Background()) if err != nil { log.Println("统计出错", err.Error()) } return n } return 0 } //更新一个字段 //func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) { // client := e.GetEsConn() // defer e.DestoryEsConn(client) // if client != nil { // defer func() { // if r := recover(); r != nil { // log.Println("[E]", r) // for skip := 1; ; skip++ { // _, file, line, ok := runtime.Caller(skip) // if !ok { // break // } // go log.Printf("%v,%v\n", file, line) // } // } // }() // for _, data := range update { // id := data["id"] // updateStr := data["updateStr"] // if id != "" && updateStr != "" { // _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do() // if err != nil { // log.Println("更新检索出错:", err.Error()) // } // } else { // log.Println("数据错误") // } // } // } //} //更新多个字段 //func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) { // client := e.GetEsConn() // defer e.DestoryEsConn(client) // if client != nil { // defer func() { // if r := recover(); r != nil { // log.Println("[E]", r) // for skip := 1; ; skip++ { // _, file, line, ok := runtime.Caller(skip) // if !ok { // break // } // go log.Printf("%v,%v\n", file, line) // } // } // }() // for _, arr := range arrs { // id := arr[0]["id"].(string) // update := arr[1]["update"].([]string) // for _, str := range update { // _, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do() // if err != nil { // log.Println("更新检索出错:", err.Error()) // } // } // } // } //} // UpdateBulk 批量修改文档 func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) { client := e.GetEsConn() defer e.DestoryEsConn(client) bulkService := client.Bulk().Index(index).Refresh("true") bulkService.Type(itype) for _, d := range docs { id := d[0]["_id"].(string) doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1]) bulkService.Add(doc) } _, err := bulkService.Do(context.Background()) if err != nil { fmt.Printf("UpdateBulk all success err is %v\n", err) } //if len(res.Failed()) > 0 { // fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0])) //} } // UpsertBulk 批量修改文档(不存在则插入) func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error { client := e.GetEsConn() defer e.DestoryEsConn(client) bulkService := client.Bulk().Index(index).Refresh("true") bulkService.Type("bidding") for i := range ids { doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i]) bulkService.Add(doc) } res, err := bulkService.Do(context.Background()) if err != nil { return err } if len(res.Failed()) > 0 { return errors.New(res.Failed()[0].Error.Reason) } return nil } // 批量删除 func (e *Elastic) DeleteBulk(index string, ids []string) { client := e.GetEsConn() defer e.DestoryEsConn(client) bulkService := client.Bulk().Index(index).Refresh("true") bulkService.Type("bidding") for i := range ids { req := es.NewBulkDeleteRequest().Id(ids[i]) bulkService.Add(req) } res, err := bulkService.Do(context.Background()) if err != nil { fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded())) } }