123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- package elastic
- import (
- "encoding/json"
- "fmt"
- "log"
- "runtime"
- "strings"
- "sync"
- "time"
- es "sfbase/olivere/elastic.v1"
- )
- type Elastic struct {
- S_esurl string
- I_size int
- Addrs []string
- Pool chan *es.Client
- lastTime int64
- lastTimeLock sync.Mutex
- ntimeout int
- }
- func (e *Elastic) InitElasticSize() {
- defer catch()
- e.Pool = make(chan *es.Client, e.I_size)
- for _, s := range strings.Split(e.S_esurl, ",") {
- e.Addrs = append(e.Addrs, s)
- }
- for i := 0; i < e.I_size; i++ {
- client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false))
- e.Pool <- client
- }
- }
- //关闭连接
- func (e *Elastic) DestoryEsConn(client *es.Client) {
- select {
- case e.Pool <- client:
- break
- case <-time.After(time.Second * 1):
- if client != nil {
- client.Stop()
- }
- client = nil
- }
- }
- func (e *Elastic) GetEsConn() *es.Client {
- select {
- case c := <-e.Pool:
- if c == nil || !c.IsRunning() {
- log.Println("new esclient.", len(e.Pool))
- client, err := es.NewClient(es.SetURL(e.Addrs...),
- es.SetMaxRetries(2), es.SetSniff(false))
- if err == nil && client.IsRunning() {
- return client
- }
- }
- return c
- case <-time.After(time.Second * 4):
- //超时
- e.ntimeout++
- e.lastTimeLock.Lock()
- defer e.lastTimeLock.Unlock()
- //12秒后允许创建链接
- c := time.Now().Unix() - e.lastTime
- if c > 12 {
- e.lastTime = time.Now().Unix()
- log.Println("add client..", len(e.Pool))
- c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false))
- go func() {
- for i := 0; i < 2; i++ {
- client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false))
- e.Pool <- client
- }
- }()
- return c
- }
- return nil
- }
- }
- func (e *Elastic) Get(index, itype, query string) *[]map[string]interface{} {
- client := e.GetEsConn()
- defer func() {
- go e.DestoryEsConn(client)
- }()
- var res []map[string]interface{}
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
- if err != nil {
- log.Println("从ES查询出错", err.Error())
- return nil
- }
- if searchResult.Hits != nil {
- resNum := len(searchResult.Hits.Hits)
- if resNum < 5000 {
- res = make([]map[string]interface{}, resNum)
- for i, hit := range searchResult.Hits.Hits {
- parseErr := json.Unmarshal(*hit.Source, &res[i])
- if parseErr == nil && hit.Highlight != nil && res[i] != nil {
- res[i]["highlight"] = map[string][]string(hit.Highlight)
- }
- }
- } else {
- log.Println("查询结果太多,查询到:", resNum, "条")
- }
- }
- }
- return &res
- }
- //关闭elastic
- func (e *Elastic) Close() {
- defer catch()
- for i := 0; i < e.I_size; i++ {
- cli := <-e.Pool
- cli.Stop()
- cli = nil
- }
- e.Pool = nil
- e = nil
- }
- //获取连接
- //func (e *Elastic) GetEsConn() (c *es.Client) {
- // defer catch()
- // select {
- // case c = <-e.Pool:
- // if c == nil || !c.IsRunning() {
- // client, err := es.NewClient(es.SetURL(addrs...),
- // es.SetMaxRetries(2), es.SetSniff(false))
- // if err == nil && client.IsRunning() {
- // return client
- // }
- // return nil
- // }
- // return
- // case <-time.After(time.Second * 7):
- // //超时
- // ntimeout++
- // log.Println("timeout times:", ntimeout)
- // return nil
- // }
- //}
- func (e *Elastic) BulkSave(index, itype string, obj *[]map[string]interface{}, isDelBefore bool) {
- defer catch()
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- if client != nil {
- req := client.Bulk()
- for _, v := range *obj {
- if isDelBefore {
- req = req.Add(es.NewBulkDeleteRequest().Index(index).Type(itype).Id(fmt.Sprintf("%v", v["_id"])))
- }
- req = req.Add(es.NewBulkIndexRequest().Index(index).Type(itype).Doc(v))
- }
- _, err := req.Do()
- if err != nil {
- log.Println("批量保存到ES出错", err.Error())
- }
- }
- }
- //先删除后增
- func (e *Elastic) UpdateNewDoc(index, itype string, obj ...interface{}) bool {
- defer catch()
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- b := false
- if client != nil {
- var err error
- for _, v := range obj {
- tempObj := objToMap(v)
- id := fmt.Sprintf("%v", (*tempObj)["_id"])
- client.Delete().Index(index).Type(itype).Id(id).Do()
- _, err = client.Index().Index(index).Type(itype).BodyJson(tempObj).Do()
- if err != nil {
- log.Println("保存到ES出错", err.Error())
- } else {
- b = true
- }
- }
- }
- return b
- }
- //根据id删除索引对象
- func (e *Elastic) DelById(index, itype, id string) bool {
- defer catch()
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- b := false
- if client != nil {
- var err error
- _, err = client.Delete().Index(index).Type(itype).Id(id).Do()
- if err != nil {
- log.Println("更新检索出错:", err.Error())
- } else {
- b = true
- }
- }
- return b
- }
- func (e *Elastic) GetNoLimit(index, itype, query string) *[]map[string]interface{} {
- //log.Println("query -- ", query)
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- var res []map[string]interface{}
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
- if err != nil {
- log.Println("从ES查询出错", err.Error())
- return nil
- }
- if searchResult.Hits != nil {
- resNum := len(searchResult.Hits.Hits)
- res = make([]map[string]interface{}, resNum)
- for i, hit := range searchResult.Hits.Hits {
- json.Unmarshal(*hit.Source, &res[i])
- }
- }
- }
- return &res
- }
- func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- query := `{"query":{"term":{"_id":"` + id + `"}}`
- if len(fields) > 0 {
- query = query + `,"_source":[` + fields + `]`
- }
- query = query + "}"
- searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
- if err != nil {
- log.Println("从ES查询出错", err.Error())
- return nil
- }
- var res map[string]interface{}
- if searchResult.Hits != nil {
- resNum := len(searchResult.Hits.Hits)
- if resNum == 1 {
- res = make(map[string]interface{})
- for _, hit := range searchResult.Hits.Hits {
- json.Unmarshal(*hit.Source, &res)
- }
- return &res
- }
- }
- }
- return nil
- }
- func (e *Elastic) Count(index, itype string, query interface{}) int64 {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- var qq es.Query
- if qs, ok := query.(string); ok {
- temp := es.BoolQuery{
- QueryStrings: qs,
- }
- qq = temp
- } else if qi, ok2 := query.(es.Query); ok2 {
- qq = qi
- }
- n, err := client.Count(index).Type(itype).Query(qq).Do()
- if err != nil {
- log.Println("统计出错", err.Error())
- }
- return n
- }
- return 0
- }
|