123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- package main
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- es "gopkg.in/olivere/elastic.v7"
- "log"
- "qfw/util"
- "runtime"
- "strings"
- "sync"
- "time"
- )
- type Elastic struct {
- S_esurl string
- I_size int
- Addrs []string
- Pool chan *es.Client
- lastTime int64
- lastTimeLock sync.Mutex
- ntimeout int
- Username string
- Password string
- }
- func (e *Elastic) InitElasticSize() {
- e.Pool = make(chan *es.Client, e.I_size)
- for _, s := range strings.Split(e.S_esurl, ",") {
- e.Addrs = append(e.Addrs, s)
- }
- log.Println(e.Password, e.Username)
- for i := 0; i < e.I_size; i++ {
- client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
- e.Pool <- client
- }
- }
- //关闭连接
- func (e *Elastic) DestoryEsConn(client *es.Client) {
- select {
- case e.Pool <- client:
- break
- case <-time.After(time.Second * 1):
- if client != nil {
- client.Stop()
- }
- client = nil
- }
- }
- func (e *Elastic) GetEsConn() *es.Client {
- select {
- case c := <-e.Pool:
- if c == nil || !c.IsRunning() {
- log.Println("new esclient.", len(e.Pool))
- client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
- es.SetSniff(false))
- if err == nil && client.IsRunning() {
- return client
- }
- }
- return c
- case <-time.After(time.Second * 4):
- //超时
- e.ntimeout++
- e.lastTimeLock.Lock()
- defer e.lastTimeLock.Unlock()
- //12秒后允许创建链接
- c := time.Now().Unix() - e.lastTime
- if c > 12 {
- e.lastTime = time.Now().Unix()
- log.Println("add client..", len(e.Pool))
- c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
- go func() {
- for i := 0; i < 2; i++ {
- client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
- e.Pool <- client
- }
- }()
- return c
- }
- return nil
- }
- }
- func (e *Elastic) Get(index, query string) *[]map[string]interface{} {
- client := e.GetEsConn()
- defer func() {
- go e.DestoryEsConn(client)
- }()
- var res []map[string]interface{}
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
- if err != nil {
- log.Println("从ES查询出错", err.Error())
- return nil
- }
- if searchResult.Hits != nil {
- resNum := len(searchResult.Hits.Hits)
- if resNum < 5000 {
- res = make([]map[string]interface{}, resNum)
- for i, hit := range searchResult.Hits.Hits {
- parseErr := json.Unmarshal(hit.Source, &res[i])
- if parseErr == nil && hit.Highlight != nil && res[i] != nil {
- res[i]["highlight"] = map[string][]string(hit.Highlight)
- }
- }
- } else {
- log.Println("查询结果太多,查询到:", resNum, "条")
- }
- }
- }
- return &res
- }
- //关闭elastic
- func (e *Elastic) Close() {
- for i := 0; i < e.I_size; i++ {
- cli := <-e.Pool
- cli.Stop()
- cli = nil
- }
- e.Pool = nil
- e = nil
- }
- //获取连接
- //func (e *Elastic) GetEsConn() (c *es.Client) {
- // defer util.Catch()
- // select {
- // case c = <-e.Pool:
- // if c == nil || !c.IsRunning() {
- // client, err := es.NewClient(es.SetURL(addrs...),
- // es.SetMaxRetries(2), es.SetSniff(false))
- // if err == nil && client.IsRunning() {
- // return client
- // }
- // return nil
- // }
- // return
- // case <-time.After(time.Second * 7):
- // //超时
- // ntimeout++
- // log.Println("timeout times:", ntimeout)
- // return nil
- // }
- //}
- func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- if client != nil {
- req := client.Bulk()
- for _, v := range obj {
- //if isDelBefore {
- // req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
- //}
- id := util.ObjToString(v["_id"])
- delete(v, "_id")
- req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
- }
- _, err := req.Do(context.Background())
- if err != nil {
- log.Println("批量保存到ES出错", err.Error())
- }
- }
- }
- //根据id删除索引对象
- func (e *Elastic) DelById(index, itype, id string) bool {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- b := false
- if client != nil {
- var err error
- _, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.Background())
- if err != nil {
- log.Println("更新检索出错:", err.Error())
- } else {
- b = true
- }
- }
- return b
- }
- func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- var res []map[string]interface{}
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
- if err != nil {
- log.Println("从ES查询出错", err.Error())
- return nil
- }
- if searchResult.Hits != nil {
- resNum := len(searchResult.Hits.Hits)
- util.Debug(resNum)
- res = make([]map[string]interface{}, resNum)
- for i, hit := range searchResult.Hits.Hits {
- json.Unmarshal(hit.Source, &res[i])
- }
- }
- }
- return &res
- }
- //func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
- // client := e.GetEsConn()
- // defer e.DestoryEsConn(client)
- // if client != nil {
- // defer func() {
- // if r := recover(); r != nil {
- // log.Println("[E]", r)
- // for skip := 1; ; skip++ {
- // _, file, line, ok := runtime.Caller(skip)
- // if !ok {
- // break
- // }
- // go log.Printf("%v,%v\n", file, line)
- // }
- // }
- // }()
- // query := `{"query":{"term":{"_id":"` + id + `"}}`
- // if len(fields) > 0 {
- // query = query + `,"_source":[` + fields + `]`
- // }
- // query = query + "}"
- // searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
- // if err != nil {
- // log.Println("从ES查询出错", err.Error())
- // return nil
- // }
- // var res map[string]interface{}
- // if searchResult.Hits != nil {
- // resNum := len(searchResult.Hits.Hits)
- // if resNum == 1 {
- // res = make(map[string]interface{})
- // for _, hit := range searchResult.Hits.Hits {
- // json.Unmarshal(*hit.Source., &res)
- // }
- // return &res
- // }
- // }
- // }
- // return nil
- //}
- func (e *Elastic) Count(index, itype string, query interface{}) int64 {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- var qq es.Query
- if qi, ok2 := query.(es.Query); ok2 {
- qq = qi
- }
- n, err := client.Count(index).Query(qq).Do(context.Background())
- if err != nil {
- log.Println("统计出错", err.Error())
- }
- return n
- }
- return 0
- }
- //更新一个字段
- //func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
- // client := e.GetEsConn()
- // defer e.DestoryEsConn(client)
- // if client != nil {
- // defer func() {
- // if r := recover(); r != nil {
- // log.Println("[E]", r)
- // for skip := 1; ; skip++ {
- // _, file, line, ok := runtime.Caller(skip)
- // if !ok {
- // break
- // }
- // go log.Printf("%v,%v\n", file, line)
- // }
- // }
- // }()
- // for _, data := range update {
- // id := data["id"]
- // updateStr := data["updateStr"]
- // if id != "" && updateStr != "" {
- // _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
- // if err != nil {
- // log.Println("更新检索出错:", err.Error())
- // }
- // } else {
- // log.Println("数据错误")
- // }
- // }
- // }
- //}
- //更新多个字段
- //func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
- // client := e.GetEsConn()
- // defer e.DestoryEsConn(client)
- // if client != nil {
- // defer func() {
- // if r := recover(); r != nil {
- // log.Println("[E]", r)
- // for skip := 1; ; skip++ {
- // _, file, line, ok := runtime.Caller(skip)
- // if !ok {
- // break
- // }
- // go log.Printf("%v,%v\n", file, line)
- // }
- // }
- // }()
- // for _, arr := range arrs {
- // id := arr[0]["id"].(string)
- // update := arr[1]["update"].([]string)
- // for _, str := range update {
- // _, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
- // if err != nil {
- // log.Println("更新检索出错:", err.Error())
- // }
- // }
- // }
- // }
- //}
- // UpdateBulk 批量修改文档
- func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- bulkService := client.Bulk().Index(index).Refresh("true")
- bulkService.Type(itype)
- for _, d := range docs {
- id := d[0]["_id"].(string)
- doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
- bulkService.Add(doc)
- }
- _, err := bulkService.Do(context.Background())
- if err != nil {
- fmt.Printf("UpdateBulk all success err is %v\n", err)
- }
- //if len(res.Failed()) > 0 {
- // fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
- //}
- }
- // UpsertBulk 批量修改文档(不存在则插入)
- func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- bulkService := client.Bulk().Index(index).Refresh("true")
- bulkService.Type("bidding")
- for i := range ids {
- doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
- bulkService.Add(doc)
- }
- res, err := bulkService.Do(context.Background())
- if err != nil {
- return err
- }
- if len(res.Failed()) > 0 {
- return errors.New(res.Failed()[0].Error.Reason)
- }
- return nil
- }
- // 批量删除
- func (e *Elastic) DeleteBulk(index string, ids []string) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- bulkService := client.Bulk().Index(index).Refresh("true")
- bulkService.Type("bidding")
- for i := range ids {
- req := es.NewBulkDeleteRequest().Id(ids[i])
- bulkService.Add(req)
- }
- res, err := bulkService.Do(context.Background())
- if err != nil {
- fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
- }
- }
|