package elastic import ( "encoding/json" "fmt" "log" "runtime" "strings" "sync" "time" es "sfbase/olivere/elastic.v1" ) type Elastic struct { S_esurl string I_size int Addrs []string Pool chan *es.Client lastTime int64 lastTimeLock sync.Mutex ntimeout int } func (e *Elastic) InitElasticSize() { defer catch() e.Pool = make(chan *es.Client, e.I_size) for _, s := range strings.Split(e.S_esurl, ",") { e.Addrs = append(e.Addrs, s) } for i := 0; i < e.I_size; i++ { client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false)) e.Pool <- client } } //关闭连接 func (e *Elastic) DestoryEsConn(client *es.Client) { select { case e.Pool <- client: break case <-time.After(time.Second * 1): if client != nil { client.Stop() } client = nil } } func (e *Elastic) GetEsConn() *es.Client { select { case c := <-e.Pool: if c == nil || !c.IsRunning() { log.Println("new esclient.", len(e.Pool)) client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false)) if err == nil && client.IsRunning() { return client } } return c case <-time.After(time.Second * 4): //超时 e.ntimeout++ e.lastTimeLock.Lock() defer e.lastTimeLock.Unlock() //12秒后允许创建链接 c := time.Now().Unix() - e.lastTime if c > 12 { e.lastTime = time.Now().Unix() log.Println("add client..", len(e.Pool)) c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false)) go func() { for i := 0; i < 2; i++ { client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false)) e.Pool <- client } }() return c } return nil } } func (e *Elastic) Get(index, itype, query string) (*[]map[string]interface{}, error) { client := e.GetEsConn() defer func() { go e.DestoryEsConn(client) }() var res []map[string]interface{} if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return nil, err } if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) if resNum < 5000 { res = make([]map[string]interface{}, resNum) for i, hit := range searchResult.Hits.Hits { parseErr := json.Unmarshal(*hit.Source, &res[i]) if parseErr == nil && hit.Highlight != nil && res[i] != nil { res[i]["highlight"] = map[string][]string(hit.Highlight) } } } else { log.Println("查询结果太多,查询到:", resNum, "条") } } } return &res, nil } //关闭elastic func (e *Elastic) Close() { defer catch() for i := 0; i < e.I_size; i++ { cli := <-e.Pool cli.Stop() cli = nil } e.Pool = nil e = nil } //获取连接 //func (e *Elastic) GetEsConn() (c *es.Client) { // defer catch() // select { // case c = <-e.Pool: // if c == nil || !c.IsRunning() { // client, err := es.NewClient(es.SetURL(addrs...), // es.SetMaxRetries(2), es.SetSniff(false)) // if err == nil && client.IsRunning() { // return client // } // return nil // } // return // case <-time.After(time.Second * 7): // //超时 // ntimeout++ // log.Println("timeout times:", ntimeout) // return nil // } //} func (e *Elastic) BulkSave(index, itype string, obj *[]map[string]interface{}, isDelBefore bool) { defer catch() client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { req := client.Bulk() for _, v := range *obj { if isDelBefore { req = req.Add(es.NewBulkDeleteRequest().Index(index).Type(itype).Id(fmt.Sprintf("%v", v["_id"]))) } req = req.Add(es.NewBulkIndexRequest().Index(index).Type(itype).Doc(v)) } _, err := req.Do() if err != nil { log.Println("批量保存到ES出错", err.Error()) } } } //先删除后增 func (e *Elastic) UpdateNewDoc(index, itype string, obj ...interface{}) bool { defer catch() client := e.GetEsConn() defer e.DestoryEsConn(client) b := false if client != nil { var err error for _, v := range obj { tempObj := objToMap(v) id := fmt.Sprintf("%v", (*tempObj)["_id"]) client.Delete().Index(index).Type(itype).Id(id).Do() _, err = client.Index().Index(index).Type(itype).BodyJson(tempObj).Do() if err != nil { log.Println("保存到ES出错", err.Error()) } else { b = true } } } return b } //根据id删除索引对象 func (e *Elastic) DelById(index, itype, id string) bool { defer catch() client := e.GetEsConn() defer e.DestoryEsConn(client) b := false if client != nil { var err error _, err = client.Delete().Index(index).Type(itype).Id(id).Do() if err != nil { log.Println("更新检索出错:", err.Error()) } else { b = true } } return b } func (e *Elastic) GetNoLimit(index, itype, query string) *[]map[string]interface{} { //log.Println("query -- ", query) client := e.GetEsConn() defer e.DestoryEsConn(client) var res []map[string]interface{} if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return nil } if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) res = make([]map[string]interface{}, resNum) for i, hit := range searchResult.Hits.Hits { json.Unmarshal(*hit.Source, &res[i]) } } } return &res } func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} { client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() query := `{"query":{"term":{"_id":"` + id + `"}}` if len(fields) > 0 { query = query + `,"_source":[` + fields + `]` } query = query + "}" searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return nil } var res map[string]interface{} if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) if resNum == 1 { res = make(map[string]interface{}) for _, hit := range searchResult.Hits.Hits { json.Unmarshal(*hit.Source, &res) } return &res } } } return nil } func (e *Elastic) Count(index, itype string, query interface{}) int64 { client := e.GetEsConn() defer e.DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() var qq es.Query if qs, ok := query.(string); ok { temp := es.BoolQuery{ QueryStrings: qs, } qq = temp } else if qi, ok2 := query.(es.Query); ok2 { qq = qi } n, err := client.Count(index).Type(itype).Query(qq).Do() if err != nil { log.Println("统计出错", err.Error()) } return n } return 0 }