123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640 |
- package elastic
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- es "github.com/olivere/elastic/v7"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "runtime"
- "strings"
- "sync"
- "time"
- )
- type Elastic struct {
- S_esurl string
- I_size int
- Addrs []string
- Pool chan *es.Client
- lastTime int64
- lastTimeLock sync.Mutex
- ntimeout int
- Username string
- Password string
- }
- func (e *Elastic) InitElasticSize() {
- e.Pool = make(chan *es.Client, e.I_size)
- for _, s := range strings.Split(e.S_esurl, ",") {
- e.Addrs = append(e.Addrs, s)
- }
- for i := 0; i < e.I_size; i++ {
- client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
- e.Pool <- client
- }
- }
- // 关闭连接
- func (e *Elastic) DestoryEsConn(client *es.Client) {
- select {
- case e.Pool <- client:
- break
- case <-time.After(time.Second * 1):
- if client != nil {
- client.Stop()
- }
- client = nil
- }
- }
- func (e *Elastic) GetEsConn() *es.Client {
- select {
- case c := <-e.Pool:
- if c == nil || !c.IsRunning() {
- log.Println("new esclient.", len(e.Pool))
- client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
- es.SetMaxRetries(2), es.SetSniff(false))
- if err == nil && client.IsRunning() {
- return client
- }
- }
- return c
- case <-time.After(time.Second * 4):
- //超时
- e.ntimeout++
- e.lastTimeLock.Lock()
- defer e.lastTimeLock.Unlock()
- //12秒后允许创建链接
- c := time.Now().Unix() - e.lastTime
- if c > 12 {
- e.lastTime = time.Now().Unix()
- log.Println("add client..", len(e.Pool))
- c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
- go func() {
- for i := 0; i < 2; i++ {
- client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
- e.Pool <- client
- }
- }()
- return c
- }
- return nil
- }
- }
- func (e *Elastic) Get(index string, query es.Query) *[]map[string]interface{} {
- client := e.GetEsConn()
- defer func() {
- go e.DestoryEsConn(client)
- }()
- var res []map[string]interface{}
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- searchResult, err := client.Search().Index(index).Query(query).Do(context.Background())
- if err != nil {
- log.Println("从ES查询出错", err.Error())
- return nil
- }
- if searchResult.Hits != nil {
- resNum := len(searchResult.Hits.Hits)
- if resNum < 5000 {
- res = make([]map[string]interface{}, resNum)
- for i, hit := range searchResult.Hits.Hits {
- parseErr := json.Unmarshal(hit.Source, &res[i])
- if parseErr == nil && hit.Highlight != nil && res[i] != nil {
- res[i]["highlight"] = map[string][]string(hit.Highlight)
- }
- }
- } else {
- log.Println("查询结果太多,查询到:", resNum, "条")
- }
- }
- }
- return &res
- }
- // 关闭elastic
- func (e *Elastic) Close() {
- for i := 0; i < e.I_size; i++ {
- cli := <-e.Pool
- cli.Stop()
- cli = nil
- }
- e.Pool = nil
- e = nil
- }
- //获取连接
- //func (e *Elastic) GetEsConn() (c *es.Client) {
- // defer util.Catch()
- // select {
- // case c = <-e.Pool:
- // if c == nil || !c.IsRunning() {
- // client, err := es.NewClient(es.SetURL(addrs...),
- // es.SetMaxRetries(2), es.SetSniff(false))
- // if err == nil && client.IsRunning() {
- // return client
- // }
- // return nil
- // }
- // return
- // case <-time.After(time.Second * 7):
- // //超时
- // ntimeout++
- // log.Println("timeout times:", ntimeout)
- // return nil
- // }
- //}
- func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- if client != nil {
- req := client.Bulk()
- for _, v := range obj {
- //if isDelBefore {
- // req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
- //}
- id := util.ObjToString(v["_id"])
- doc := make(map[string]interface{}, 0)
- for k, va := range v {
- doc[k] = va
- }
- delete(doc, "_id")
- req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(doc))
- }
- _, err := req.Do(context.Background())
- if err != nil {
- log.Println("批量保存到ES出错", err.Error())
- }
- }
- }
- // 根据id删除索引对象
- func (e *Elastic) DelById(index, id string) bool {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- b := false
- if client != nil {
- var err error
- _, err = client.Delete().Index(index).Id(id).Do(context.Background())
- if err != nil {
- log.Println("更新检索出错:", err.Error())
- } else {
- b = true
- }
- }
- return b
- }
- func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- var res []map[string]interface{}
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
- if err != nil {
- log.Println("从ES查询出错", err.Error())
- return nil
- }
- if searchResult.Hits != nil {
- resNum := len(searchResult.Hits.Hits)
- res = make([]map[string]interface{}, resNum)
- for i, hit := range searchResult.Hits.Hits {
- json.Unmarshal(hit.Source, &res[i])
- }
- }
- }
- return &res
- }
- //func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
- // client := e.GetEsConn()
- // defer e.DestoryEsConn(client)
- // if client != nil {
- // defer func() {
- // if r := recover(); r != nil {
- // log.Println("[E]", r)
- // for skip := 1; ; skip++ {
- // _, file, line, ok := runtime.Caller(skip)
- // if !ok {
- // break
- // }
- // go log.Printf("%v,%v\n", file, line)
- // }
- // }
- // }()
- // query := `{"query":{"term":{"_id":"` + id + `"}}`
- // if len(fields) > 0 {
- // query = query + `,"_source":[` + fields + `]`
- // }
- // query = query + "}"
- // searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
- // if err != nil {
- // log.Println("从ES查询出错", err.Error())
- // return nil
- // }
- // var res map[string]interface{}
- // if searchResult.Hits != nil {
- // resNum := len(searchResult.Hits.Hits)
- // if resNum == 1 {
- // res = make(map[string]interface{})
- // for _, hit := range searchResult.Hits.Hits {
- // json.Unmarshal(*hit.Source., &res)
- // }
- // return &res
- // }
- // }
- // }
- // return nil
- //}
- func (e *Elastic) Count(index string, query interface{}) int64 {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- var qq es.Query
- if qi, ok2 := query.(es.Query); ok2 {
- qq = qi
- }
- n, err := client.Count(index).Query(qq).Do(context.Background())
- if err != nil {
- log.Println("统计出错", err.Error())
- }
- return n
- }
- return 0
- }
- //更新一个字段
- //func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
- // client := e.GetEsConn()
- // defer e.DestoryEsConn(client)
- // if client != nil {
- // defer func() {
- // if r := recover(); r != nil {
- // log.Println("[E]", r)
- // for skip := 1; ; skip++ {
- // _, file, line, ok := runtime.Caller(skip)
- // if !ok {
- // break
- // }
- // go log.Printf("%v,%v\n", file, line)
- // }
- // }
- // }()
- // for _, data := range update {
- // id := data["id"]
- // updateStr := data["updateStr"]
- // if id != "" && updateStr != "" {
- // _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
- // if err != nil {
- // log.Println("更新检索出错:", err.Error())
- // }
- // } else {
- // log.Println("数据错误")
- // }
- // }
- // }
- //}
- //更新多个字段
- //func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
- // client := e.GetEsConn()
- // defer e.DestoryEsConn(client)
- // if client != nil {
- // defer func() {
- // if r := recover(); r != nil {
- // log.Println("[E]", r)
- // for skip := 1; ; skip++ {
- // _, file, line, ok := runtime.Caller(skip)
- // if !ok {
- // break
- // }
- // go log.Printf("%v,%v\n", file, line)
- // }
- // }
- // }()
- // for _, arr := range arrs {
- // id := arr[0]["id"].(string)
- // update := arr[1]["update"].([]string)
- // for _, str := range update {
- // _, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
- // if err != nil {
- // log.Println("更新检索出错:", err.Error())
- // }
- // }
- // }
- // }
- //}
- // UpdateBulk 批量修改文档
- func (e *Elastic) UpdateBulk(index string, docs ...[]map[string]interface{}) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- bulkService := client.Bulk().Index(index).Refresh("true")
- //bulkService.Type(itype)
- for _, d := range docs {
- id := d[0]["_id"].(string)
- doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
- bulkService.Add(doc)
- }
- _, err := bulkService.Do(context.Background())
- if err != nil {
- fmt.Printf("UpdateBulk all success err is %v\n", err)
- }
- //if len(res.Failed()) > 0 {
- // fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
- //}
- }
- // UpsertBulk 批量修改文档(不存在则插入)
- func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- bulkService := client.Bulk().Index(index).Refresh("true")
- //bulkService.Type("bidding")
- for i := range ids {
- doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
- bulkService.Add(doc)
- }
- res, err := bulkService.Do(context.Background())
- if err != nil {
- return err
- }
- if len(res.Failed()) > 0 {
- return errors.New(res.Failed()[0].Error.Reason)
- }
- return nil
- }
- // 批量删除
- func (e *Elastic) DeleteBulk(index string, ids []string) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- bulkService := client.Bulk().Index(index).Refresh("true")
- //bulkService.Type("bidding")
- for i := range ids {
- req := es.NewBulkDeleteRequest().Id(ids[i])
- bulkService.Add(req)
- }
- res, err := bulkService.Do(context.Background())
- if err != nil {
- fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
- }
- }
- // InsertOrUpdate 插入或更新
- func (e *Elastic) InsertOrUpdate(index string, docs []map[string]interface{}) error {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- for _, item := range docs {
- // 获取唯一标识符
- id := item["id"].(string)
- if id == "" {
- id = item["_id"].(string)
- }
- // 根据id判断记录是否存在
- exists, err := client.Exists().
- Index(index).
- Id(id).
- Do(context.Background())
- if err != nil {
- return err
- }
- // 存在则更新,不存在则插入
- if exists {
- _, err := client.Update().
- Index(index).
- Id(id).
- Doc(item).
- Do(context.Background())
- if err != nil {
- return err
- }
- } else {
- _, err := client.Index().
- Index(index).
- Id(id).
- BodyJson(item).
- Do(context.Background())
- if err != nil {
- return err
- }
- }
- }
- return nil
- }
- // ExistsIndex 判断索引是否存在
- func (e *Elastic) ExistsIndex(index string) (exists bool, err error) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- exists, err = client.IndexExists(index).Do(context.Background())
- return
- }
- // CreateIndex 创建索引
- func (e *Elastic) CreateIndex(index string, mapping string) (err error) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- _, err = client.CreateIndex(index).BodyString(mapping).Do(context.Background())
- return
- }
- // DeleteIndex 删除索引
- func (e *Elastic) DeleteIndex(index string) (err error) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- _, err = client.DeleteIndex(index).Do(context.Background())
- return
- }
- // RemoveAlias 移除别名
- func (e *Elastic) RemoveAlias(index, aliasName string) (err error) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- _, err = client.Alias().Remove(index, aliasName).Do(context.Background())
- return
- }
- // SetAlias 添加别名
- func (e *Elastic) SetAlias(index, aliasName string) (err error) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- _, err = client.Alias().Add(index, aliasName).Do(context.Background())
- return
- }
- // DeleteByID 根据ID 删除索引数据;id 或者索引名称不存在,都不会报错
- func (e *Elastic) DeleteByID(index, id string) error {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- _, err := client.Delete().Index(index).Id(id).Do(context.Background())
- if err != nil && es.IsNotFound(err) {
- return nil
- }
- return err
- }
- // UpdateDocument 更新指定ID的文档
- func (e *Elastic) UpdateDocument(indexName string, documentID string, updateData map[string]interface{}) error {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- updateResult, err := client.Update().
- Index(indexName).
- Id(documentID).
- Doc(updateData).
- Do(context.Background())
- if err != nil {
- return err
- }
- if updateResult.Result != "updated" {
- return fmt.Errorf("Document not updated: %v", updateResult.Result)
- }
- return nil
- }
- // Save 保存对象
- func (e *Elastic) Save(index string, obj interface{}) bool {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- data := util.ObjToMap(obj)
- _id := mongodb.BsonIdToSId((*data)["_id"])
- (*data)["id"] = _id
- delete((*data), "_id")
- _, err := client.Index().Index(index).Id(_id).BodyJson(data).Do(context.TODO())
- if err != nil {
- log.Println("保存到ES出错", err.Error(), obj)
- return false
- } else {
- return true
- }
- }
- // SaveDocument 保存单个索引
- func (e *Elastic) SaveDocument(index string, data map[string]interface{}) error {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- // 从数据中获取 ID 字段
- id, ok := data["id"].(string)
- if !ok {
- return errors.New("ID field not found in data")
- }
- // 将数据转换为 JSON 格式
- docJSON, err := json.Marshal(data)
- if err != nil {
- return err
- }
- // 创建一个新的索引请求
- indexRequest := client.Index().
- Index(index).
- Id(id).
- BodyJson(string(docJSON))
- // 执行索引请求
- _, err = indexRequest.Do(context.Background())
- if err != nil {
- return err
- }
- return nil
- }
- // GetById 获取单个索引文档
- func (e *Elastic) GetById(index string, id string) (err error, data map[string]interface{}) {
- client := e.GetEsConn()
- defer e.DestoryEsConn(client)
- ctx := context.Background()
- // 使用Get方法获取文档
- getResponse, err := client.Get().
- Index(index).
- Id(id).
- Do(ctx)
- if err != nil {
- // 处理错误
- return err, nil
- }
- if !getResponse.Found {
- // 文档存在,返回文档数据
- return errors.New("Document not found"), nil // 文档不存在,返回错误
- }
- if err := json.Unmarshal(getResponse.Source, &data); err != nil {
- // 处理解码错误
- return err, nil
- }
- return nil, data
- }
|