package elastic import ( "encoding/json" "fmt" es "gopkg.in/olivere/elastic.v2" "log" "runtime" "strings" "sync" "time" ) type Elastic struct { S_esurl string I_size int Addrs []string Pool chan *es.Client lastTime int64 lastTimeLock sync.Mutex ntimeout int } func (e *Elastic) InitElasticSize() { e.Pool = make(chan *es.Client, e.I_size) for _, s := range strings.Split(e.S_esurl, ",") { e.Addrs = append(e.Addrs, s) } for i := 0; i < e.I_size; i++ { client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false)) e.Pool <- client } } //关闭连接 func (e *Elastic) DestoryEsConn(client *es.Client) { select { case e.Pool <- client: break case <-time.After(time.Second * 1): if client != nil { client.Stop() } client = nil } } func (e *Elastic) GetEsConn() *es.Client { select { case c := <-e.Pool: if c == nil || !c.IsRunning() { log.Println("new esclient.", len(e.Pool)) client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false)) if err == nil && client.IsRunning() { return client } } return c case <-time.After(time.Second * 4): //超时 e.ntimeout++ e.lastTimeLock.Lock() defer e.lastTimeLock.Unlock() //12秒后允许创建链接 c := time.Now().Unix() - e.lastTime if c > 12 { e.lastTime = time.Now().Unix() log.Println("add client..", len(e.Pool)) c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false)) go func() { for i := 0; i < 2; i++ { client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false)) e.Pool <- client } }() return c } return nil } } func (e *Elastic) Get(index, itype, query string) *[]map[string]interface{} { client := e.GetEsConn() defer func() { go e.DestoryEsConn(client) }() var res []map[string]interface{} if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return nil } if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) if resNum < 5000 { res = make([]map[string]interface{}, resNum) for i, hit := range searchResult.Hits.Hits { parseErr := json.Unmarshal(*hit.Source, &res[i]) if parseErr == nil && hit.Highlight != nil && res[i] != nil { res[i]["highlight"] = map[string][]string(hit.Highlight) } } } else { log.Println("查询结果太多,查询到:", resNum, "条") } } } return &res } //关闭elastic func (e *Elastic) Close() { for i := 0; i < e.I_size; i++ { cli := <-e.Pool cli.Stop() cli = nil } e.Pool = nil e = nil } //获取连接 //func (e *Elastic) GetEsConn() (c *es.Client) { // defer util.Catch() // select { // case c = <-e.Pool: // if c == nil || !c.IsRunning() { // client, err := es.NewClient(es.SetURL(addrs...), // es.SetMaxRetries(2), es.SetSniff(false)) // if err == nil && client.IsRunning() { // return client // } // return nil // } // return // case <-time.After(time.Second * 7): // //超时 // ntimeout++ // log.Println("timeout times:", ntimeout) // return nil // } //} func (e *Elastic) BulkSave(index, itype string, obj *[]map[string]interface{}, isDelBefore bool) { client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { req := client.Bulk() for _, v := range *obj { if isDelBefore { req = req.Add(es.NewBulkDeleteRequest().Index(index).Type(itype).Id(fmt.Sprintf("%v", v["_id"]))) } req = req.Add(es.NewBulkIndexRequest().Index(index).Type(itype).Doc(v)) } _, err := req.Do() if err != nil { log.Println("批量保存到ES出错", err.Error()) } } } //根据id删除索引对象 func (e *Elastic) DelById(index, itype, id string) bool { client := e.GetEsConn() defer e.DestoryEsConn(client) b := false if client != nil { var err error _, err = client.Delete().Index(index).Type(itype).Id(id).Do() if err != nil { log.Println("更新检索出错:", err.Error()) } else { b = true } } return b } func (e *Elastic) GetNoLimit(index, itype, query string) *[]map[string]interface{} { //log.Println("query -- ", query) client := e.GetEsConn() defer e.DestoryEsConn(client) var res []map[string]interface{} if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return nil } if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) res = make([]map[string]interface{}, resNum) for i, hit := range searchResult.Hits.Hits { json.Unmarshal(*hit.Source, &res[i]) } } } return &res } func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} { client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() query := `{"query":{"term":{"_id":"` + id + `"}}` if len(fields) > 0 { query = query + `,"_source":[` + fields + `]` } query = query + "}" searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return nil } var res map[string]interface{} if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) if resNum == 1 { res = make(map[string]interface{}) for _, hit := range searchResult.Hits.Hits { json.Unmarshal(*hit.Source, &res) } return &res } } } return nil } func (e *Elastic) Count(index, itype string, query interface{}) int64 { client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() var qq es.Query if qi, ok2 := query.(es.Query); ok2 { qq = qi } n, err := client.Count(index).Type(itype).Query(qq).Do() if err != nil { log.Println("统计出错", err.Error()) } return n } return 0 } //更新一个字段 func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) { client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() for _, data := range update { id := data["id"] updateStr := data["updateStr"] if id != "" && updateStr != "" { _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do() if err != nil { log.Println("更新检索出错:", err.Error()) } } else { log.Println("数据错误") } } } } //更新多个字段 func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) { client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() for _, arr := range arrs { id := arr[0]["id"].(string) update := arr[1]["update"].([]string) for _, str := range update { _, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do() if err != nil { log.Println("更新检索出错:", err.Error()) } } } } }