123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251 |
- package elastic
- import (
- "context"
- "encoding/json"
- "fmt"
- "log"
- "reflect"
- "regexp"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "time"
- es "github.com/olivere/elastic/v7"
- )
- // 检索库服务地址
- var (
- addrs []string
- LocCity = map[string]string{}
- SIZE = 30
- username string
- password string
- )
- const (
- QStr = `{"query":{"bool":{"must":[$and],"must_not":[],
- "should":[$or],"minimum_should_match" : 1}}}`
- )
- var pool chan *es.Client
- var ntimeout int
- var syncPool sync.Pool
- var filterReg = regexp.MustCompile(`,\s*"should"\s*:\s*\[\s*\]\s*,\s*"minimum_should_match"\s*:\s*1`)
- // 初始化全文检索
- func InitElastic(addr string) {
- InitElasticSize(addr, SIZE)
- }
- //自定义HttpClient
- /**
- var httpclient = &http.Client{Transport: &http.Transport{
- Dial: func(netw, addr string) (net.Conn, error) {
- deadline := time.Now().Add(5000 * time.Millisecond)
- c, err := net.DialTimeout(netw, addr, 10000*time.Millisecond)
- if err != nil {
- return nil, err
- }
- tcp_conn := c.(*net.TCPConn)
- tcp_conn.SetKeepAlive(false)
- tcp_conn.SetDeadline(deadline)
- return tcp_conn, nil
- },
- DisableKeepAlives: true, //不保持,这样才能释放
- }}
- **/
- //var op = es.SetHttpClient(httpclient)
- var poolsize = int32(20)
- // n倍的池
- func InitElasticSize(addr string, size int) {
- InitElasticSizeByAuth(addr, size, "", "")
- }
- // 初始化es,带有用户名密码认证
- func InitElasticSizeByAuth(addr string, size int, u, p string) {
- poolsize = int32(3 * size)
- pool = make(chan *es.Client, poolsize)
- for _, s := range strings.Split(addr, ",") {
- addrs = append(addrs, s)
- }
- username = u
- password = p
- for i := 0; i < size; i++ {
- client, _ := newClient()
- pool <- client
- }
- }
- func newClient() (*es.Client, error) {
- opt := []es.ClientOptionFunc{es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false)}
- if username != "" && password != "" {
- opt = append(opt, es.SetBasicAuth(username, password))
- }
- return es.NewClient(opt...)
- }
- // 关闭连接
- func DestoryEsConn(client *es.Client) {
- select {
- case pool <- client:
- break
- case <-time.After(time.Second * 1):
- if client != nil {
- client.Stop()
- }
- client = nil
- }
- }
- var (
- lastTime = int64(0)
- lastTimeLock = &sync.Mutex{}
- )
- // 获取连接
- func GetEsConn() *es.Client {
- select {
- case c := <-pool:
- if c == nil || !c.IsRunning() {
- log.Println("new esclient.", len(pool))
- client, err := newClient()
- if err == nil && client.IsRunning() {
- return client
- }
- }
- return c
- case <-time.After(time.Second * 4):
- //超时
- ntimeout++
- lastTimeLock.Lock()
- defer lastTimeLock.Unlock()
- //12秒后允许创建链接
- c := time.Now().Unix() - lastTime
- if c > 12 {
- lastTime = time.Now().Unix()
- log.Println("add client..", len(pool))
- c, _ := newClient()
- go func() {
- for i := 0; i < 2; i++ {
- client, _ := newClient()
- pool <- client
- }
- }()
- return c
- }
- return nil
- }
- }
- // 保存对象
- func Save(index, itype string, obj interface{}) bool {
- client := GetEsConn()
- defer DestoryEsConn(client)
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- data := objToMap(obj)
- _id := bsonIdToSId((*data)["_id"])
- (*data)["id"] = _id
- delete((*data), "_id")
- _, err := client.Index().Index(index).Id(_id).BodyJson(data).Do(context.TODO())
- if err != nil {
- log.Println("保存到ES出错", err.Error(), obj)
- return false
- } else {
- return true
- }
- }
- func SaveNew(index string, obj interface{}) bool {
- client := GetEsConn()
- defer DestoryEsConn(client)
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- data := objToMap(obj)
- _id := fmt.Sprint((*data)["id"])
- _, err := client.Index().Index(index).Id(_id).BodyJson(data).Do(context.TODO())
- if err != nil {
- log.Println("保存到ES出错", err.Error(), obj)
- return false
- } else {
- return true
- }
- }
- // 通用查询
- // {"query": {"bool":{"must":[{"query_string":{"default_field":"name","query":"*"}}]}}}
- // {"query":{"bool":{"must":{"match":{"content":{"query":"fulltextsearch","operator":"and"}}},"should":[{"match":{"content":{"query":"Elasticsearch","boost":3}}},{"match":{"content":{"query":"Lucene","boost":2}}}]}}}
- // prefix
- // {"query":{"match":{"title":{"query":"brownfox","operator":"and"}}}} //默认为or
- // {"query":{"multi_match":{"query":"PolandStreetW1V","type":"most_fields","fields":["*_street","city^2","country","postcode"]}}}
- // {"query":{"wildcard":{"postcode":"W?F*HW"}}}
- // {"query":{"regexp":{"postcode":"W[0-9].+"}}}
- // {"query":{"filtered":{"filter":{"range":{"price":{"gte":10000}}}}},"aggs":{"single_avg_price":{"avg":{"field":"price"}}}}
- // {"query":{"match":{"make":"ford"}},"aggs":{"colors":{"terms":{"field":"color"}}}}//查fork有几种颜色
- // 过滤器不会计算相关度的得分,所以它们在计算上更快一些
- // {"query":{"filtered":{"query":{"match_all":{}},"filter":{"range":{"balance":{"gte":20000,"lte":30000}}}}}}
- // {"query":{"match_all":{}},"from":10,"size":10,"_source":["account_number","balance"],"sort":{"balance":{"order":"desc"}}}
- // {"query":{"match_phrase":{"address":"milllane"}}}和match不同会去匹配整个短语,相当于must[]
- func GetBySearchType(index, searchType, query string) (int64, *[]map[string]interface{}) {
- t, _, l := get(index, "", searchType, query, true, true)
- return t, l
- }
- func Get(index, itype, query string) *[]map[string]interface{} {
- _, _, r := get(index, itype, "", query, true, true)
- return r
- }
- func get(index, itype, searchType, query string, isLimit, isHighlight bool) (int64, int, *[]map[string]interface{}) {
- query = filterReg.ReplaceAllString(query, "")
- //log.Println("query -- ", query)
- client := GetEsConn()
- defer func() {
- go DestoryEsConn(client)
- }()
- var res []map[string]interface{}
- var total int64
- var resNum int
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- ss := client.Search().Index(index).Source(query)
- if searchType != "" {
- ss.SearchType(searchType)
- }
- searchResult, err := ss.Do(context.TODO())
- if err != nil {
- log.Println("从ES查询出错", err.Error())
- return total, resNum, nil
- }
- total = searchResult.TotalHits()
- if searchResult.Hits != nil {
- resNum = len(searchResult.Hits.Hits)
- if isLimit && resNum < 5000 {
- res = make([]map[string]interface{}, resNum)
- for i, hit := range searchResult.Hits.Hits {
- //d := json.NewDecoder(bytes.NewBuffer(*hit.Source))
- //d.UseNumber()
- //d.Decode(&res[i])
- parseErr := json.Unmarshal(hit.Source, &res[i])
- if res[i] != nil {
- res[i]["_id"] = hit.Id
- }
- if isHighlight && parseErr == nil && hit.Highlight != nil && res[i] != nil {
- res[i]["highlight"] = map[string][]string(hit.Highlight)
- }
- }
- } else {
- log.Println("查询结果太多,查询到:", resNum, "条")
- }
- }
- }
- return total, resNum, &res
- }
- func GetOA(index, itype, query string) (*[]map[string]interface{}, int) {
- _, n, l := get(index, itype, "", query, true, true)
- return l, n
- }
- func GetNoLimit(index, itype, query string) *[]map[string]interface{} {
- _, _, l := get(index, itype, "", query, false, false)
- return l
- }
- // 分页查询
- // {"name":"张三","$and":[{"age":{"$gt":10}},{"age":{"$lte":20}}]}
- // fields直接是 `"_id","title"`
- func GetPage(index, itype, query, order, field string, start, limit int) *[]map[string]interface{} {
- return Get(index, itype, MakeQuery(query, order, field, start, limit))
- }
- // openapi
- func GetOAPage(index, itype, query, order, field string, start, limit int) (*[]map[string]interface{}, int) {
- return GetOA(index, itype, MakeQuery(query, order, field, start, limit))
- }
- var SR = strings.Replace
- func MakeQuery(query, order, fileds string, start, limit int) string {
- res := AnalyQuery(query, "", QStr)
- if len(res) > 10 {
- res = SR(SR(SR(SR(res, ",$and", "", -1), "$and", "", -1), ",$or", "", -1), "$or", "", -1)
- if len(fileds) > 0 {
- //"_source":["account_number","balance"]
- res = res[:len(res)-1] + `,"_source":[` + fileds + "]}"
- }
- //{"name":-1,"age":1}
- if len(order) > 0 {
- res = res[:len(res)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
- }
- if start > -1 {
- res = res[:len(res)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
- }
- return res
- }
- return ""
- }
- // {"name":"aaa"}
- func AnalyQuery(query interface{}, parent string, result string) string {
- m := make(map[string]interface{})
- if q1, ok := query.(string); ok {
- json.Unmarshal([]byte(q1), &m)
- } else if q2, ok2 := query.(map[string]interface{}); ok2 {
- m = q2
- }
- if len(parent) == 0 {
- for k, v := range m {
- if k == "$and" || k == "$or" {
- temps := ""
- if map1, ok := v.([]interface{}); ok {
- for i := 0; i < len(map1); i++ {
- temps += "," + AnalyQuery(map1[i], k, "")
- }
- }
- if len(temps) > 0 {
- temps = temps[1:]
- }
- result = SR(result, k, temps+","+k, 1)
- } else {
- switch reflect.TypeOf(v).String() {
- case "string":
- if strings.Index(k, "TERM_") == 0 {
- result = SR(result, "$and", `{"term":{"`+SR(k, "TERM_", "", 1)+`":"`+fmt.Sprintf("%v", v)+`"}},$and`, 1)
- } else {
- result = SR(result, "$and", `{"query_string":{"default_field":"`+k+`","query":"`+fmt.Sprintf("%v", v)+`"}},$and`, 1)
- }
- case "int", "int8", "int32", "int64", "float32", "float64":
- if strings.Index(k, "TERM_") == 0 {
- result = SR(result, "$and", `{"term":{"`+SR(k, "TERM_", "", 1)+`":`+fmt.Sprintf("%v", v)+`}},$and`, 1)
- } else {
- result = SR(result, "$and", `{"query_string":{"default_field":"`+k+`","query":`+fmt.Sprintf("%v", v)+`}},$and`, 1)
- }
- default:
- result = SR(result, "$and", AnalyQuery(v, k, "")+",$and", 1)
- }
- }
- }
- return result
- } else {
- for k, v := range m {
- if k == "$in" {
- s := ""
- if map1, ok := v.([]interface{}); ok {
- for i := 0; i < len(map1); i++ {
- s += "," + `"` + fmt.Sprintf("%v", map1[i]) + `"`
- }
- }
- if len(s) > 0 {
- s = s[1:]
- }
- return `{"terms":{"` + parent + `":[` + s + `]}}`
- } else if strings.Contains(k, "$lt") || strings.Contains(k, "$gt") {
- return `{"range":{"` + parent + `":{"` + SR(k, "$", "", 1) + `":` + fmt.Sprintf("%v", v) + `}}}`
- } else {
- switch reflect.TypeOf(v).String() {
- case "string":
- if strings.Index(k, "TERM_") == 0 {
- return `{"term":{"` + SR(k, "TERM_", "", 1) + `":"` + fmt.Sprintf("%v", v) + `"}}`
- } else {
- return `{"query_string":{"default_field":"` + k + `","query":"` + fmt.Sprintf("%v", v) + `"}}`
- }
- case "int", "int8", "int32", "int64", "float32", "float64":
- if strings.Index(k, "TERM_") == 0 {
- return `{"term":{"` + SR(k, "TERM_", "", 1) + `":` + fmt.Sprintf("%v", v) + `}}`
- } else {
- return `{"query_string":{"default_field":"` + k + `","query":` + fmt.Sprintf("%v", v) + `}}`
- }
- default:
- return AnalyQuery(v, k, result)
- }
- }
- }
- }
- return result
- }
- func GetByIdField(index, itype, id, fields string) *map[string]interface{} {
- client := GetEsConn()
- defer DestoryEsConn(client)
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- query := `{"query":{"term":{"_id":"` + id + `"}}`
- if len(fields) > 0 {
- query = query + `,"_source":[` + fields + `]`
- }
- query = query + "}"
- searchResult, err := client.Search().Index(index).Source(query).Do(context.TODO())
- if err != nil {
- log.Println("从ES查询出错", err.Error())
- return nil
- }
- var res map[string]interface{}
- if searchResult.Hits != nil {
- resNum := len(searchResult.Hits.Hits)
- if resNum == 1 {
- res = make(map[string]interface{})
- for _, hit := range searchResult.Hits.Hits {
- json.Unmarshal(hit.Source, &res)
- if res != nil {
- res["_id"] = hit.Id
- }
- }
- return &res
- }
- }
- }
- return nil
- }
- // 根据id来查询文档
- func GetById(index, itype string, ids ...string) *[]map[string]interface{} {
- client := GetEsConn()
- defer DestoryEsConn(client)
- var res []map[string]interface{}
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- query := es.NewIdsQuery().Ids(ids...)
- searchResult, err := client.Search().Index(index).Query(query).Do(context.TODO())
- if err != nil {
- log.Println("从ES查询出错", err.Error())
- return nil
- }
- if searchResult.Hits != nil {
- resNum := len(searchResult.Hits.Hits)
- if resNum < 5000 {
- res = make([]map[string]interface{}, resNum)
- for i, hit := range searchResult.Hits.Hits {
- json.Unmarshal(hit.Source, &res[i])
- if res[i] != nil {
- res[i]["_id"] = hit.Id
- }
- }
- } else {
- log.Println("查询结果太多,查询到:", resNum, "条")
- }
- }
- }
- return &res
- }
- // 根据语句更新对象
- func Update(index, itype, id string, updateStr string) bool {
- client := GetEsConn()
- defer DestoryEsConn(client)
- b := false
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- var err error
- _, err = client.Update().Index(index).Id(id).Script(es.NewScript(updateStr)).Do(context.TODO())
- if err != nil {
- log.Println("更新检索出错:", err.Error())
- } else {
- b = true
- }
- }
- return b
- }
- func BulkUpdate(index, itype string, ids []string, updateStr string) {
- client := GetEsConn()
- defer DestoryEsConn(client)
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- for _, id := range ids {
- _, err := client.Update().Index(index).Id(id).Script(es.NewScript(updateStr)).Do(context.TODO())
- if err != nil {
- log.Println("更新检索出错:", err.Error())
- }
- }
- }
- }
- func NewBulkUpdate(index string, params ...[]string) bool {
- client := GetEsConn()
- defer DestoryEsConn(client)
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- bulk := client.Bulk()
- for _, param := range params {
- req := es.NewBulkUpdateRequest().Index(index).Id(param[0]).RetryOnConflict(3).Script(es.NewScript(param[1]))
- bulk.Add(req)
- }
- _, err := bulk.Refresh("wait_for").Do(context.TODO())
- if err != nil {
- log.Println("批量更新检索出错:", err.Error())
- return false
- }
- return true
- }
- return false
- }
- // 根据id删除索引对象
- func DelById(index, itype, id string) bool {
- client := GetEsConn()
- defer DestoryEsConn(client)
- b := false
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- var err error
- _, err = client.Delete().Index(index).Id(id).Do(context.TODO())
- if err != nil {
- log.Println("更新检索出错:", err.Error())
- } else {
- b = true
- }
- }
- return b
- }
- // 先删除后增
- func UpdateNewDoc(index, itype string, obj ...interface{}) bool {
- client := GetEsConn()
- defer DestoryEsConn(client)
- b := false
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- var err error
- for _, v := range obj {
- tempObj := objToMap(v)
- if tempObj == nil || len(*tempObj) == 0 {
- continue
- }
- id := bsonIdToSId((*tempObj)["_id"])
- (*tempObj)["id"] = id
- delete(*tempObj, "_id")
- if id != "" {
- client.Delete().Index(index).Id(id).Do(context.TODO())
- }
- _, err = client.Index().Index(index).Id(id).BodyJson(tempObj).Do(context.TODO())
- if err != nil {
- log.Println("保存到ES出错", err.Error())
- } else {
- b = true
- }
- }
- }
- return b
- }
- func UpdateNew(index string, obj ...interface{}) bool {
- client := GetEsConn()
- defer DestoryEsConn(client)
- b := false
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- var err error
- for _, v := range obj {
- tempObj := objToMap(v)
- if tempObj == nil || len(*tempObj) == 0 {
- continue
- }
- _id := fmt.Sprint((*tempObj)["_id"])
- if _id != "" {
- client.Delete().Index(index).Id(_id).Do(context.TODO())
- }
- _, err = client.Index().Index(index).Id(_id).BodyJson(tempObj).Do(context.TODO())
- if err != nil {
- log.Println("保存到ES出错", err.Error())
- } else {
- b = true
- }
- }
- }
- return b
- }
- func BulkSave(index, itype string, obj *[]map[string]interface{}, isDelBefore bool) {
- client := GetEsConn()
- defer DestoryEsConn(client)
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- req := client.Bulk()
- for _, v := range *obj {
- if v == nil || len(v) == 0 {
- continue
- }
- _id := bsonIdToSId(v["_id"])
- v["id"] = _id
- delete(v, "_id")
- if isDelBefore && _id != "" {
- req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(_id))
- }
- req = req.Add(es.NewBulkIndexRequest().Index(index).Id(_id).Doc(v))
- }
- _, err := req.Do(context.TODO())
- if err != nil {
- log.Println("批量保存到ES出错", err.Error())
- }
- }
- }
- func Count(index, itype string, query interface{}) int64 {
- client := GetEsConn()
- defer DestoryEsConn(client)
- if client != nil {
- defer func() {
- if r := recover(); r != nil {
- log.Println("[E]", r)
- for skip := 1; ; skip++ {
- _, file, line, ok := runtime.Caller(skip)
- if !ok {
- break
- }
- go log.Printf("%v,%v\n", file, line)
- }
- }
- }()
- var n int64
- var err error
- if qs, ok := query.(string); ok {
- n, err = client.Count(index).BodyString(qs).Do(context.TODO())
- } else if qi, ok2 := query.(es.Query); ok2 {
- n, err = client.Count(index).Query(qi).Do(context.TODO())
- }
- if err != nil {
- log.Println("统计出错", err.Error())
- }
- return n
- }
- return 0
- }
- //ngram精确查询
- /*
- {
- "query": {
- "bool": {
- "should": [
- {
- "bool":{
- "must":[
- { "multi_match": {
- "query": "智能",
- "type": "phrase",
- "fields": [
- "title"
- ],
- "analyzer": "my_ngram"
- }
- },{
- "multi_match": {
- "query": "机器",
- "type": "phrase",
- "fields": [
- "title"
- ],
- "analyzer": "my_ngram"
- }
- },{
- "multi_match": {
- "query": "2016",
- "type": "phrase",
- "fields": [
- "title"
- ],
- "analyzer": "my_ngram"
- }
- }
- ]
- }
- },
- {
- "bool":{
- "must":[
- { "multi_match": {
- "query": "河南",
- "type": "phrase",
- "fields": [
- "title"
- ],
- "analyzer": "my_ngram"
- }
- },{
- "multi_match": {
- "query": "工商",
- "type": "phrase",
- "fields": [
- "title"
- ],
- "analyzer": "my_ngram"
- }
- },{
- "multi_match": {
- "query": "2016",
- "type": "phrase",
- "fields": [
- "title"
- ],
- "analyzer": "my_ngram"
- }
- }
- ]
- }
- }
- ],"minimum_should_match": 1
- }
- },
- "_source": [
- "_id",
- "title"
- ],
- "from": 0,
- "size": 10,
- "sort": [{
- "publishtime": "desc"
- }]
- }
- */
- //"2016+智能+办公,"河南+工商"
- //["2016+智能+办公","河南+工商"]
- //QStr = `{"query":{"bool":{should":[$or],"minimum_should_match" : 1}}}`
- //{"bool":{"must":[]}}
- //{"multi_match": {"query": "$word","type": "phrase", "fields": [$field],"analyzer": "my_ngram"}}
- //"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {"detail": {"fragment_size": 1,"number_of_fragments": 1},"title": {"fragment_size": 1,"number_of_fragments": 1}}}
- const (
- //此处最后少一个},正好NgramStr取[1:]多一个}
- FilterQuery = `{"query": {"filtered": {"filter": {"bool": {"must": [%s]}},%s}}`
- NgramStr = `{"query":{"bool":{"must":[%s],"should":[%s],"minimum_should_match": 1}}}`
- NgramMust = `{"bool":{"must":[%s]}}`
- NgramMustAndNot = `{"bool":{"must":[%s],"must_not":[%s]}}`
- minq = `{"multi_match": {"query": "%s","type": "phrase", "fields": [%s]}}`
- HL = `"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {%s}}`
- highlightStr = `%s: {"fragment_size": %d,"number_of_fragments": 1}`
- FilterQuery_New = `{"query":{"bool":{"must": [%s%s%s],"should":[]}}}`
- MatchQueryString = `{"match": {%s: { "query":"%s", "operator": "and"}}}`
- HL_New = `"highlight": {"pre_tags": ["<HL>"],"post_tags": ["<HL>"],"fields": {%s}}`
- //数据查询高亮标记2019-07-10
- HL_MP = `"highlight": {"pre_tags": ["<HL>"],"post_tags": ["</HL>"],"fields": {%s}}`
- ik_highlightStr = `%s: {"fragment_size": %d,"number_of_fragments": 1,"require_field_match": true}`
- IK_pre_tags = `<font class=\"es-highlight\">`
- IK_post_tags = `</font>`
- HL_IK = `"highlight": {"pre_tags": ["` + IK_pre_tags + `"],"post_tags": ["` + IK_post_tags + `"],"fields": {%s}}`
- )
- // 替换了"号
- func GetNgramQuery(query interface{}, mustquery, findfields string) (qstr string) {
- var words []string
- if q, ok := query.(string); ok {
- if q != "" {
- words = strings.Split(q, ",")
- }
- } else if q, ok := query.([]string); ok {
- words = q
- } else if q, ok := query.([]interface{}); ok {
- words = objArrToStringArr(q)
- }
- if words != nil {
- new_minq := fmt.Sprintf(minq, "%s", findfields)
- musts := []string{}
- for _, qs_words := range words {
- qws := strings.Split(qs_words, "+")
- mq := []string{}
- for _, qs_word := range qws {
- mq = append(mq, fmt.Sprintf(new_minq, ReplaceYH(qs_word)))
- }
- musts = append(musts, fmt.Sprintf(NgramMust, strings.Join(mq, ",")))
- }
- qstr = fmt.Sprintf(NgramStr, mustquery, strings.Join(musts, ","))
- //log.Println("ngram-query", qstr)
- } else {
- qstr = fmt.Sprintf(NgramStr, mustquery, "")
- }
- return
- }
- func GetNgramQuery_New(querystring, querymust interface{}, must, findfields string) (qstring string) {
- querymust_string := ""
- var wordsMust []string
- if q, ok := querymust.(string); ok {
- if q != "" {
- wordsMust = strings.Split(q, ",")
- }
- } else if q, ok := querymust.([]string); ok {
- wordsMust = q
- } else if q, ok := querymust.([]interface{}); ok {
- wordsMust = objArrToStringArr(q)
- }
- if wordsMust != nil {
- new_minq := fmt.Sprintf(minq, "%s", findfields)
- musts := []string{}
- for _, qs_wordsMust := range wordsMust {
- qws := strings.Split(qs_wordsMust, "+")
- mq := []string{}
- for _, qs_word := range qws {
- mq = append(mq, fmt.Sprintf(new_minq, qs_word))
- }
- musts = append(musts, fmt.Sprintf(NgramMust, strings.Join(mq, ",")))
- }
- querymust_string = strings.Join(musts, ",")
- }
- //log.Println("must", must, querymust_string)
- //querystring---------------------------------------------
- query_string := ""
- var querysShold []string
- if q, ok := querystring.(string); ok {
- if q != "" {
- querysShold = strings.Split(q, ",")
- }
- } else if q, ok := querystring.([]string); ok {
- querysShold = q
- } else if q, ok := querystring.([]interface{}); ok {
- querysShold = objArrToStringArr(q)
- }
- if querysShold != nil {
- for k, name := range strings.Split(findfields, ",") {
- for _, qs_querysShold := range querysShold {
- if k > 0 {
- query_string = query_string + "," + fmt.Sprintf(MatchQueryString, fmt.Sprint(name), qs_querysShold)
- } else {
- query_string = query_string + fmt.Sprintf(MatchQueryString, fmt.Sprint(name), qs_querysShold)
- }
- }
- }
- }
- //log.Println("querystring", query_string)
- if querymust_string == "" {
- qstring = fmt.Sprintf(FilterQuery_New, must, query_string, querymust_string)
- } else {
- qstring = fmt.Sprintf(FilterQuery_New, must, query_string, ","+querymust_string)
- }
- return
- }
- func GetByNgram(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int) *[]map[string]interface{} {
- return GetByNgramAll(index, itype, query, mustquery, findfields, order, fields, start, limit, false, false)
- }
- // 增加高亮、过滤查询、高亮截取字数
- func GetByNgramOther(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool, count int) *[]map[string]interface{} {
- defer catch()
- qstr := ""
- if mustquery != "" && filtermode {
- qstr = GetNgramQuery(query, "", findfields)
- qstr = fmt.Sprintf(FilterQuery, mustquery, qstr[1:])
- } else {
- qstr = GetNgramQuery(query, mustquery, findfields)
- }
- if qstr != "" {
- if highlight {
- ws := []string{}
- for _, w := range strings.Split(findfields, ",") {
- ws = append(ws, fmt.Sprintf(highlightStr, w, count))
- }
- qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
- }
- if len(fields) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
- }
- if len(order) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
- }
- if start > -1 {
- qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
- }
- //log.Println("ngram-find", qstr)
- return Get(index, itype, qstr)
- } else {
- return nil
- }
- }
- // 增加高亮、过滤查询
- // 替换了"号
- func GetByNgramAll(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool) *[]map[string]interface{} {
- defer catch()
- qstr := ""
- if mustquery != "" && filtermode {
- qstr = GetNgramQuery(query, "", findfields)
- qstr = fmt.Sprintf(FilterQuery, mustquery, qstr[1:])
- } else {
- qstr = GetNgramQuery(query, mustquery, findfields)
- }
- if qstr != "" {
- if highlight {
- ws := []string{}
- for _, w := range strings.Split(findfields, ",") {
- ws = append(ws, fmt.Sprintf(highlightStr, w, 1))
- }
- qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
- }
- if len(fields) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
- }
- if strings.HasPrefix(order, "CUSTOM_") {
- qstr = qstr[:len(qstr)-1] + `,` + strings.TrimLeft(order, "CUSTOM_") + `}`
- } else if len(order) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
- }
- if start > -1 {
- qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
- }
- //log.Println("ngram-find", qstr)
- return Get(index, itype, qstr)
- } else {
- return nil
- }
- }
- // 增加高亮、过滤查询
- func GetByNgramAll_New(index, itype string, querystring, querymust interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool) *[]map[string]interface{} {
- defer catch()
- qstr := ""
- if filtermode {
- qstr = GetNgramQuery_New(querystring, querymust, mustquery, findfields)
- } else {
- qstr = GetNgramQuery_New(querystring, "", mustquery, findfields)
- }
- if qstr != "" {
- if highlight {
- ws := []string{}
- for _, w := range strings.Split(findfields, ",") {
- ws = append(ws, w+`:{"force_source": true}`)
- }
- qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL_New, strings.Join(ws, ",")) + `}`
- }
- if len(fields) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
- }
- if len(order) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", ",", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
- }
- if start > -1 {
- qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
- }
- //log.Println("ngram-find", order, qstr)
- return Get(index, itype, qstr)
- } else {
- return nil
- }
- }
- type KeyConfig struct {
- Keys []string `json:"key"`
- NotKeys []string `json:"notkey"`
- InfoTypes []string `json:"infotype"`
- Areas []string `json:"area"`
- }
- // 替换了"号
- func GetResForJY(index, itype string, keys []KeyConfig, allquery, findfields, SortQuery, fields string, start, limit int) *[]map[string]interface{} {
- if len(keys) > 0 {
- qstr := ""
- new_minq := fmt.Sprintf(minq, "%s", findfields)
- not_new_minq := fmt.Sprintf(minq, "%s", findfields) //排除词只查询标题
- musts := []string{}
- for _, qs_words := range keys {
- mq := []string{}
- notmq := []string{}
- for _, qs_word := range qs_words.Keys {
- mq = append(mq, fmt.Sprintf(new_minq, ReplaceYH(qs_word)))
- /*
- qs := AnalyzerWord("bidding", qs_word)
- for _, qw := range qs {
- mq = append(mq, fmt.Sprintf(new_minq, ReplaceYH(qw)))
- }
- */
- }
- for _, qs_word := range qs_words.NotKeys {
- notmq = append(notmq, fmt.Sprintf(not_new_minq, ReplaceYH(qs_word)))
- }
- if len(qs_words.Areas) > 0 {
- mq = append(mq, fmt.Sprintf(`{"terms":{"area":["%s"]}}`, strings.Join(qs_words.Areas, `","`)))
- }
- if len(qs_words.InfoTypes) > 0 {
- mq = append(mq, fmt.Sprintf(`{"terms":{"toptype":["%s"]}}`, strings.Join(qs_words.InfoTypes, `","`)))
- }
- musts = append(musts, fmt.Sprintf(NgramMustAndNot, strings.Join(mq, ","), strings.Join(notmq, ",")))
- }
- qstr = fmt.Sprintf(NgramStr, "", strings.Join(musts, ","))
- qstr = fmt.Sprintf(FilterQuery, allquery, qstr[1:])
- ws := []string{}
- for _, w := range strings.Split(findfields, ",") {
- ws = append(ws, fmt.Sprintf(highlightStr, w, 1))
- }
- qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
- if len(fields) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
- }
- if len(SortQuery) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"sort":` + SortQuery + `}`
- }
- if start > -1 {
- qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
- }
- //log.Println("jy-ngram-find", qstr)
- return Get(index, itype, qstr)
- } else {
- return nil
- }
- }
- func ReplaceYH(src string) (rpl string) {
- return strings.Replace(src, `"`, `\"`, -1)
- }
- func GetAllByNgram(index, itype, qstr, findfields, order, fields string, start, limit, count int, highlight bool) *[]map[string]interface{} {
- if qstr != "" {
- if highlight {
- ws := []string{}
- for _, w := range strings.Split(findfields, ",") {
- ws = append(ws, fmt.Sprintf(highlightStr, w, count))
- }
- qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
- }
- if len(fields) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
- }
- if len(order) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
- }
- if start > -1 {
- qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
- }
- log.Println("GetAllByNgram:", qstr)
- return Get(index, itype, qstr)
- } else {
- return nil
- }
- }
- // 数据标记2019-07-10
- func GetAllByNgram_MP(index, itype, qstr, findfields, order, fields string, start, limit, count int, highlight bool) *[]map[string]interface{} {
- if qstr != "" {
- if highlight {
- ws := []string{}
- for _, w := range strings.Split(findfields, ",") {
- ws = append(ws, fmt.Sprintf(highlightStr, w, count))
- }
- qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL_MP, strings.Join(ws, ",")) + `}`
- }
- if len(fields) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
- }
- if len(order) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
- }
- if start > -1 {
- qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
- }
- // log.Println("GetAllByNgram:", qstr)
- return Get(index, itype, qstr)
- } else {
- return nil
- }
- }
- // ik 分词
- func GetAllByIk(index, itype, qstr, findfields, order, fields string, start, limit, count int, highlight bool) *[]map[string]interface{} {
- if qstr != "" {
- if highlight {
- ws := []string{}
- for _, w := range strings.Split(findfields, ",") {
- ws = append(ws, fmt.Sprintf(ik_highlightStr, w, count))
- }
- qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL_IK, strings.Join(ws, ",")) + `}`
- }
- if len(fields) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
- }
- if len(order) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
- }
- if start > -1 {
- qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
- }
- //log.Println("GetAllByNgram:", qstr)
- return Get(index, itype, qstr)
- } else {
- return nil
- }
- }
- func GetResForJYView(index, itype string, keys []KeyConfig, allquery, findfields, SortQuery, fields string, start, limit int) *[]map[string]interface{} {
- if len(keys) > 0 {
- qstr := ""
- new_minq := fmt.Sprintf(minq, "%s", findfields)
- not_new_minq := fmt.Sprintf(minq, "%s", findfields) //排除词只查询标题
- musts := []string{}
- for _, qs_words := range keys {
- mq := []string{}
- notmq := []string{}
- for _, qs_word := range qs_words.Keys {
- mq = append(mq, fmt.Sprintf(new_minq, ReplaceYH(qs_word)))
- }
- for _, qs_word := range qs_words.NotKeys {
- notmq = append(notmq, fmt.Sprintf(not_new_minq, ReplaceYH(qs_word)))
- }
- if len(qs_words.Areas) > 0 {
- mq = append(mq, fmt.Sprintf(`{"terms":{"area":["%s"]}}`, strings.Join(qs_words.Areas, `","`)))
- }
- if len(qs_words.InfoTypes) > 0 {
- mq = append(mq, fmt.Sprintf(`{"terms":{"toptype":["%s"]}}`, strings.Join(qs_words.InfoTypes, `","`)))
- }
- musts = append(musts, fmt.Sprintf(NgramMustAndNot, strings.Join(mq, ","), strings.Join(notmq, ",")))
- }
- qstr = fmt.Sprintf(NgramStr, "", strings.Join(musts, ","))
- qstr = fmt.Sprintf(FilterQuery, allquery, qstr[1:])
- ws := []string{}
- for _, w := range strings.Split(findfields, ",") {
- ws = append(ws, fmt.Sprintf(highlightStr, w, 1))
- }
- qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
- if len(fields) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
- }
- if len(SortQuery) > 0 {
- qstr = qstr[:len(qstr)-1] + `,"sort":` + SortQuery + `}`
- }
- if start > -1 {
- qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
- }
- return Get(index, itype, qstr)
- } else {
- return nil
- }
- }
|