package elastic import ( "encoding/json" "fmt" "log" "net/url" "reflect" "runtime" "strconv" "strings" "sync" "time" es "app.yhyue.com/moapp/esv1/gopkg.in/olivere/elastic.v1" ) //检索库服务地址 var addrs []string var LocCity = map[string]string{} var SIZE = 30 const ( QStr = `{"query":{"bool":{"must":[$and],"must_not":[], "should":[$or],"minimum_should_match" : 1}}}` ) var pool chan *es.Client var ntimeout int var syncPool sync.Pool //初始化全文检索 func InitElastic(addr string) { InitElasticSize(addr, SIZE) } //自定义HttpClient /** var httpclient = &http.Client{Transport: &http.Transport{ Dial: func(netw, addr string) (net.Conn, error) { deadline := time.Now().Add(5000 * time.Millisecond) c, err := net.DialTimeout(netw, addr, 10000*time.Millisecond) if err != nil { return nil, err } tcp_conn := c.(*net.TCPConn) tcp_conn.SetKeepAlive(false) tcp_conn.SetDeadline(deadline) return tcp_conn, nil }, DisableKeepAlives: true, //不保持,这样才能释放 }} **/ //var op = es.SetHttpClient(httpclient) var poolsize = int32(20) //n倍的池 func InitElasticSize(addr string, size int) { poolsize = int32(3 * size) pool = make(chan *es.Client, poolsize) for _, s := range strings.Split(addr, ",") { addrs = append(addrs, s) } for i := 0; i < size; i++ { client, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false)) pool <- client } } //关闭连接 func DestoryEsConn(client *es.Client) { select { case pool <- client: break case <-time.After(time.Second * 1): if client != nil { client.Stop() } client = nil } } var ( lastTime = int64(0) lastTimeLock = &sync.Mutex{} ) //获取连接 func GetEsConn() *es.Client { select { case c := <-pool: if c == nil || !c.IsRunning() { log.Println("new esclient.", len(pool)) client, err := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false)) if err == nil && client.IsRunning() { return client } } return c case <-time.After(time.Second * 4): //超时 ntimeout++ lastTimeLock.Lock() defer lastTimeLock.Unlock() //12秒后允许创建链接 c := time.Now().Unix() - lastTime if c > 12 { lastTime = time.Now().Unix() log.Println("add client..", len(pool)) c, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false)) go func() { for i := 0; i < 2; i++ { client, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false)) pool <- client } }() return c } return nil } } //保存对象 func Save(index, itype string, obj interface{}) bool { client := GetEsConn() defer DestoryEsConn(client) defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() _, err := client.Index().Index(index).Type(itype).BodyJson(objToMap(obj)).Do() if err != nil { log.Println("保存到ES出错", err.Error(), obj) return false } else { return true } } //通用查询 //{"query": {"bool":{"must":[{"query_string":{"default_field":"name","query":"*"}}]}}} //{"query":{"bool":{"must":{"match":{"content":{"query":"fulltextsearch","operator":"and"}}},"should":[{"match":{"content":{"query":"Elasticsearch","boost":3}}},{"match":{"content":{"query":"Lucene","boost":2}}}]}}} //prefix //{"query":{"match":{"title":{"query":"brownfox","operator":"and"}}}} //默认为or //{"query":{"multi_match":{"query":"PolandStreetW1V","type":"most_fields","fields":["*_street","city^2","country","postcode"]}}} //{"query":{"wildcard":{"postcode":"W?F*HW"}}} //{"query":{"regexp":{"postcode":"W[0-9].+"}}} //{"query":{"filtered":{"filter":{"range":{"price":{"gte":10000}}}}},"aggs":{"single_avg_price":{"avg":{"field":"price"}}}} //{"query":{"match":{"make":"ford"}},"aggs":{"colors":{"terms":{"field":"color"}}}}//查fork有几种颜色 //过滤器不会计算相关度的得分,所以它们在计算上更快一些 //{"query":{"filtered":{"query":{"match_all":{}},"filter":{"range":{"balance":{"gte":20000,"lte":30000}}}}}} //{"query":{"match_all":{}},"from":10,"size":10,"_source":["account_number","balance"],"sort":{"balance":{"order":"desc"}}} //{"query":{"match_phrase":{"address":"milllane"}}}和match不同会去匹配整个短语,相当于must[] func Get(index, itype, query string) *[]map[string]interface{} { //log.Println("query -- ", query) client := GetEsConn() defer func() { go DestoryEsConn(client) }() var res []map[string]interface{} if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return nil } if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) if resNum < 5000 { res = make([]map[string]interface{}, resNum) for i, hit := range searchResult.Hits.Hits { //d := json.NewDecoder(bytes.NewBuffer(*hit.Source)) //d.UseNumber() //d.Decode(&res[i]) parseErr := json.Unmarshal(*hit.Source, &res[i]) if parseErr == nil && hit.Highlight != nil && res[i] != nil { res[i]["highlight"] = map[string][]string(hit.Highlight) } } } else { log.Println("查询结果太多,查询到:", resNum, "条") } } } return &res } // GetNew 返回查询结果数量总数 func GetNew(index, itype, query string) (*[]map[string]interface{}, int64) { //log.Println("query -- ", query) client := GetEsConn() defer func() { go DestoryEsConn(client) }() var ( res []map[string]interface{} count int64 ) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return nil, count } if searchResult.Hits != nil { count = searchResult.Hits.TotalHits resNum := len(searchResult.Hits.Hits) if resNum < 5000 { log.Println("查询结果太多,查询到:", resNum, "条") } else { res = make([]map[string]interface{}, resNum) for i, hit := range searchResult.Hits.Hits { parseErr := json.Unmarshal(*hit.Source, &res[i]) if parseErr == nil && hit.Highlight != nil && res[i] != nil { res[i]["highlight"] = map[string][]string(hit.Highlight) } } } } } return &res, count } func GetOA(index, itype, query string) (*[]map[string]interface{}, int) { //log.Println("query -- ", query) client := GetEsConn() defer func() { go DestoryEsConn(client) }() var res []map[string]interface{} var resNum int if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return nil, 0 } if searchResult.Hits != nil { resNum = len(searchResult.Hits.Hits) if resNum < 5000 { res = make([]map[string]interface{}, resNum) for i, hit := range searchResult.Hits.Hits { //d := json.NewDecoder(bytes.NewBuffer(*hit.Source)) //d.UseNumber() //d.Decode(&res[i]) parseErr := json.Unmarshal(*hit.Source, &res[i]) if parseErr == nil && hit.Highlight != nil && res[i] != nil { res[i]["highlight"] = map[string][]string(hit.Highlight) } } } else { log.Println("查询结果太多,查询到:", resNum, "条") } } } return &res, resNum } func GetNoLimit(index, itype, query string) *[]map[string]interface{} { //log.Println("query -- ", query) client := GetEsConn() defer DestoryEsConn(client) var res []map[string]interface{} if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return nil } if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) res = make([]map[string]interface{}, resNum) for i, hit := range searchResult.Hits.Hits { json.Unmarshal(*hit.Source, &res[i]) } } } return &res } //分页查询 //{"name":"张三","$and":[{"age":{"$gt":10}},{"age":{"$lte":20}}]} //fields直接是 `"_id","title"` func GetPage(index, itype, query, order, field string, start, limit int) *[]map[string]interface{} { return Get(index, itype, MakeQuery(query, order, field, start, limit)) } //openapi func GetOAPage(index, itype, query, order, field string, start, limit int) (*[]map[string]interface{}, int) { return GetOA(index, itype, MakeQuery(query, order, field, start, limit)) } var SR = strings.Replace func MakeQuery(query, order, fileds string, start, limit int) string { res := AnalyQuery(query, "", QStr) if len(res) > 10 { if strings.Contains(res, "###剑鱼###") { res = strings.ReplaceAll(res, "###剑鱼###", "\\\"") } res = SR(SR(SR(SR(res, ",$and", "", -1), "$and", "", -1), ",$or", "", -1), "$or", "", -1) if len(fileds) > 0 { //"_source":["account_number","balance"] res = res[:len(res)-1] + `,"_source":[` + fileds + "]}" } //{"name":-1,"age":1} if len(order) > 0 { res = res[:len(res)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}` } if start > -1 { res = res[:len(res)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}" } return res } return "" } //{"name":"aaa"} func AnalyQuery(query interface{}, parent string, result string) string { m := make(map[string]interface{}) if q1, ok := query.(string); ok { json.Unmarshal([]byte(q1), &m) } else if q2, ok2 := query.(map[string]interface{}); ok2 { m = q2 } if len(parent) == 0 { for k, v := range m { if k == "$and" || k == "$or" { temps := "" if map1, ok := v.([]interface{}); ok { for i := 0; i < len(map1); i++ { temps += "," + AnalyQuery(map1[i], k, "") } } if len(temps) > 0 { temps = temps[1:] } result = SR(result, k, temps+","+k, 1) } else { switch reflect.TypeOf(v).String() { case "string": if strings.Index(k, "TERM_") == 0 { result = SR(result, "$and", `{"term":{"`+SR(k, "TERM_", "", 1)+`":"`+fmt.Sprintf("%v", v)+`"}},$and`, 1) } else { result = SR(result, "$and", `{"query_string":{"default_field":"`+k+`","query":"`+fmt.Sprintf("%v", v)+`"}},$and`, 1) } case "int", "int8", "int32", "int64", "float32", "float64": if strings.Index(k, "TERM_") == 0 { result = SR(result, "$and", `{"term":{"`+SR(k, "TERM_", "", 1)+`":`+fmt.Sprintf("%v", v)+`}},$and`, 1) } else { result = SR(result, "$and", `{"query_string":{"default_field":"`+k+`","query":`+fmt.Sprintf("%v", v)+`}},$and`, 1) } default: result = SR(result, "$and", AnalyQuery(v, k, "")+",$and", 1) } } } return result } else { for k, v := range m { if k == "$in" { s := "" if map1, ok := v.([]interface{}); ok { for i := 0; i < len(map1); i++ { s += "," + `"` + fmt.Sprintf("%v", map1[i]) + `"` } } if len(s) > 0 { s = s[1:] } return `{"terms":{"` + parent + `":[` + s + `]}}` } else if strings.Contains(k, "$lt") || strings.Contains(k, "$gt") { return `{"range":{"` + parent + `":{"` + SR(k, "$", "", 1) + `":` + fmt.Sprintf("%v", v) + `}}}` } else { switch reflect.TypeOf(v).String() { case "string": if strings.Index(k, "TERM_") == 0 { return `{"term":{"` + SR(k, "TERM_", "", 1) + `":"` + fmt.Sprintf("%v", v) + `"}}` } else { return `{"query_string":{"default_field":"` + k + `","query":"` + fmt.Sprintf("%v", v) + `"}}` } case "int", "int8", "int32", "int64", "float32", "float64": if strings.Index(k, "TERM_") == 0 { return `{"term":{"` + SR(k, "TERM_", "", 1) + `":` + fmt.Sprintf("%v", v) + `}}` } else { return `{"query_string":{"default_field":"` + k + `","query":` + fmt.Sprintf("%v", v) + `}}` } default: return AnalyQuery(v, k, result) } } } } return result } func GetByIdField(index, itype, id, fields string) *map[string]interface{} { client := GetEsConn() defer DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() query := `{"query":{"term":{"_id":"` + id + `"}}` if len(fields) > 0 { query = query + `,"_source":[` + fields + `]` } query = query + "}" searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return nil } var res map[string]interface{} if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) if resNum == 1 { res = make(map[string]interface{}) for _, hit := range searchResult.Hits.Hits { json.Unmarshal(*hit.Source, &res) } return &res } } } return nil } //根据id来查询文档 func GetById(index, itype string, ids ...string) *[]map[string]interface{} { client := GetEsConn() defer DestoryEsConn(client) var res []map[string]interface{} if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() query := es.NewIdsQuery().Ids(ids...) searchResult, err := client.Search().Index(index).Type(itype).Query(&query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return nil } if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) if resNum < 5000 { res = make([]map[string]interface{}, resNum) for i, hit := range searchResult.Hits.Hits { json.Unmarshal(*hit.Source, &res[i]) } } else { log.Println("查询结果太多,查询到:", resNum, "条") } } } return &res } //删除某个索引,根据查询 func Del(index, itype string, query interface{}) bool { client := GetEsConn() defer DestoryEsConn(client) b := false if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() var err error if qs, ok := query.(string); ok { temp := es.BoolQuery{ QueryStrings: qs, } _, err = client.DeleteByQuery().Index(index).Type(itype).Query(temp).Do() } else if qi, ok2 := query.(es.Query); ok2 { _, err = client.DeleteByQuery().Index(index).Type(itype).Query(qi).Do() } if err != nil { log.Println("删除索引出错:", err.Error()) } else { b = true } } return b } //根据语句更新对象 func Update(index, itype, id string, updateStr string) bool { client := GetEsConn() defer DestoryEsConn(client) b := false if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() var err error _, err = client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do() if err != nil { log.Println("更新检索出错:", err.Error()) } else { b = true } } return b } func BulkUpdate(index, itype string, ids []string, updateStr string) { client := GetEsConn() defer DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() for _, id := range ids { _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do() if err != nil { log.Println("更新检索出错:", err.Error()) } } } } //根据id删除索引对象 func DelById(index, itype, id string) bool { client := GetEsConn() defer DestoryEsConn(client) b := false if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() var err error _, err = client.Delete().Index(index).Type(itype).Id(id).Do() if err != nil { log.Println("更新检索出错:", err.Error()) } else { b = true } } return b } //先删除后增 func UpdateNewDoc(index, itype string, obj ...interface{}) bool { client := GetEsConn() defer DestoryEsConn(client) b := false if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() var err error for _, v := range obj { tempObj := objToMap(v) id := fmt.Sprintf("%v", (*tempObj)["_id"]) client.Delete().Index(index).Type(itype).Id(id).Do() _, err = client.Index().Index(index).Type(itype).BodyJson(tempObj).Do() if err != nil { log.Println("保存到ES出错", err.Error()) } else { b = true } } } return b } //把地市代码转为地市 func getLoc(code string, res *map[string]string) (loc string) { switch len(code) { case 6: loc = (*res)[code[:2]] + " " + (*res)[code[:4]] + " " + (*res)[code] break case 4: loc = (*res)[code[:2]] + " " + (*res)[code] break case 2: loc = (*res)[code] break } return } func ConverData(ent *map[string]interface{}) map[string]interface{} { tmp := *ent id64, _ := tmp["ID"].(int64) ids := fmt.Sprintf("%d", id64) tmp2 := make(map[string]interface{}) tmp2["ID"] = ids tmp2["_id"] = tmp["_id"] tmp2["Area"] = tmp["Area"] tmp2["LeRep"] = tmp["LeRep"] tmp2["RegNo"] = tmp["RegNo"] tmp2["EntType"] = tmp["EntType"] tmp2["EntName"] = tmp["EntName"] tmp2["EntTypeName"] = tmp["EntTypeName"] tmp2["Dom"] = tmp["Dom"] tmp2["EstDate"] = tmp["EstDate"] tmp2["OpStateName"] = tmp["OpStateName"] tmp2["OpScope"] = tmp["OpScope"] tmp2["OpState"] = tmp["OpState"] tmp2["s_submitid"] = tmp["s_submitid"] tmp2["l_submittime"] = tmp["l_submittime"] tmp2["s_submitname"] = tmp["s_submitname"] tmp2["RegCapCurName"] = tmp["RegCapCurName"] //增加营业状态排序 if tmp2["OpState"] == "06" { tmp2["OpSint"] = true } else { tmp2["OpSint"] = false } tmp2["OpLocDistrict"] = tmp["OpLocDistrict"] //增加代码转名称 tmpLoc, _ := tmp["OpLocDistrict"].(string) tmp2["OpLocDistrictName"] = getLoc(tmpLoc, &LocCity) tmp2["RecCap"] = tmp["RecCap"] tmp2["RegCap"] = tmp["RegCap"] tmp2["IndustryPhy"] = tmp["IndustryPhy"] tmp2["IndustryPhyName"] = tmp["IndustryPhyName"] tmp2["RegOrg"] = tmp["RegOrg"] tmp2["RegOrgName"] = tmp["RegOrgName"] tmp2["Tel"] = tmp["Tel"] tmp2["CompForm"] = tmp["CompForm"] tmp2["CompFormName"] = tmp["CompFormName"] //增加异常名录标记 Ycml可能是bool也可能是string Ycmlb, _ := tmp["Ycml"].(bool) Ycmls, _ := tmp["Ycml"].(string) if Ycmlb || Ycmls == "1" { tmp2["Ycml"] = true } else { tmp2["Ycml"] = false } //增加年报联系信息 if tmp["Nb_email"] != nil { tmp2["Nb_email"] = tmp["Nb_email"] } if tmp["Nb_tel"] != nil { tmp2["Nb_tel"] = tmp["Nb_tel"] } if tmp["Nb_addr"] != nil { tmp2["Nb_addr"] = tmp["Nb_addr"] } s_synopsis := tmp["s_synopsis"] if s_synopsis == nil { s_synopsis = "" } tmp2["s_synopsis"] = s_synopsis //企业简介 //股东 stock := getStock(tmp["investor"]) tmp2["stock"] = stock tmp2["LegCerNO"] = tmp["LegCerNO"] if tmp["s_microwebsite"] != nil { tmp2["s_microwebsite"] = tmp["s_microwebsite"] } tmp2["SourceType"] = tmp["SourceType"] //数据来源 s_servicenames := tmp["s_servicenames"] if s_servicenames == nil { s_servicenames = "" } tmp2["s_servicenames"] = s_servicenames //服务名称 s_action := tmp["s_action"] if s_action == nil { s_action = "N" } tmp2["s_action"] = s_action tmp2["s_persion"] = tmp["s_persion"] tmp2["s_mobile"] = tmp["s_mobile"] tmp2["s_enturl"] = tmp["s_enturl"] tmp2["s_weixin"] = tmp["s_weixin"] tmp2["s_avatar"] = tmp["s_avatar"] return tmp2 } func getStock(obj interface{}) string { stock := "" if ns, ok := obj.([]interface{}); ok { stock = " " for _, ns1 := range ns { if nn, ok1 := ns1.(map[string]interface{}); ok1 { tmp := fmt.Sprintf("%s", nn["Inv"]) if strings.Index(stock, tmp) < 0 { stock += tmp + " " } } } } return stock } func BulkSave(index, itype string, obj *[]map[string]interface{}, isDelBefore bool) { client := GetEsConn() defer DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() req := client.Bulk() for _, v := range *obj { if isDelBefore { req = req.Add(es.NewBulkDeleteRequest().Index(index).Type(itype).Id(fmt.Sprintf("%v", v["_id"]))) } req = req.Add(es.NewBulkIndexRequest().Index(index).Type(itype).Doc(v)) } _, err := req.Do() if err != nil { log.Println("批量保存到ES出错", err.Error()) } } } func Count(index, itype string, query interface{}) int64 { client := GetEsConn() defer DestoryEsConn(client) if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() var qq es.Query if qs, ok := query.(string); ok { temp := es.BoolQuery{ QueryStrings: qs, } qq = temp } else if qi, ok2 := query.(es.Query); ok2 { qq = qi } n, err := client.Count(index).Type(itype).Query(qq).Do() if err != nil { log.Println("统计出错", err.Error()) } return n } return 0 } //ngram精确查询 /* { "query": { "bool": { "should": [ { "bool":{ "must":[ { "multi_match": { "query": "智能", "type": "phrase", "fields": [ "title" ], "analyzer": "my_ngram" } },{ "multi_match": { "query": "机器", "type": "phrase", "fields": [ "title" ], "analyzer": "my_ngram" } },{ "multi_match": { "query": "2016", "type": "phrase", "fields": [ "title" ], "analyzer": "my_ngram" } } ] } }, { "bool":{ "must":[ { "multi_match": { "query": "河南", "type": "phrase", "fields": [ "title" ], "analyzer": "my_ngram" } },{ "multi_match": { "query": "工商", "type": "phrase", "fields": [ "title" ], "analyzer": "my_ngram" } },{ "multi_match": { "query": "2016", "type": "phrase", "fields": [ "title" ], "analyzer": "my_ngram" } } ] } } ],"minimum_should_match": 1 } }, "_source": [ "_id", "title" ], "from": 0, "size": 10, "sort": [{ "publishtime": "desc" }] } */ //"2016+智能+办公,"河南+工商" //["2016+智能+办公","河南+工商"] //QStr = `{"query":{"bool":{should":[$or],"minimum_should_match" : 1}}}` //{"bool":{"must":[]}} //{"multi_match": {"query": "$word","type": "phrase", "fields": [$field],"analyzer": "my_ngram"}} //"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {"detail": {"fragment_size": 1,"number_of_fragments": 1},"title": {"fragment_size": 1,"number_of_fragments": 1}}} const ( //此处最后少一个},正好NgramStr取[1:]多一个} FilterQuery = `{"query": {"filtered": {"filter": {"bool": {"must": [%s]}},%s}}` NgramStr = `{"query":{"bool":{"must":[%s],"should":[%s],"minimum_should_match": 1}}}` NgramMust = `{"bool":{"must":[%s]}}` NgramMustAndNot = `{"bool":{"must":[%s],"must_not":[%s]}}` minq = `{"multi_match": {"query": "%s","type": "phrase", "fields": [%s]}}` HL = `"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {%s}}` highlightStr = `%s: {"fragment_size": %d,"number_of_fragments": 1}` FilterQuery_New = `{"query":{"bool":{"must": [%s%s%s],"should":[]}}}` MatchQueryString = `{"match": {%s: { "query":"%s", "operator": "and"}}}` HL_New = `"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {%s}}` //数据查询高亮标记2019-07-10 HL_MP = `"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {%s}}` ik_highlightStr = `%s: {"fragment_size": %d,"number_of_fragments": 1,"require_field_match": true}` IK_pre_tags = `` IK_post_tags = `` HL_IK = `"highlight": {"pre_tags": ["` + IK_pre_tags + `"],"post_tags": ["` + IK_post_tags + `"],"fields": {%s}}` ) //替换了"号 func GetNgramQuery(query interface{}, mustquery, findfields string) (qstr string) { var words []string if q, ok := query.(string); ok { if q != "" { words = strings.Split(q, ",") } } else if q, ok := query.([]string); ok { words = q } else if q, ok := query.([]interface{}); ok { words = objArrToStringArr(q) } if words != nil { new_minq := fmt.Sprintf(minq, "%s", findfields) musts := []string{} for _, qs_words := range words { qws := strings.Split(qs_words, "+") mq := []string{} for _, qs_word := range qws { mq = append(mq, fmt.Sprintf(new_minq, ReplaceYH(qs_word))) } musts = append(musts, fmt.Sprintf(NgramMust, strings.Join(mq, ","))) } qstr = fmt.Sprintf(NgramStr, mustquery, strings.Join(musts, ",")) //log.Println("ngram-query", qstr) } else { qstr = fmt.Sprintf(NgramStr, mustquery, "") } return } func GetNgramQuery_New(querystring, querymust interface{}, must, findfields string) (qstring string) { querymust_string := "" var wordsMust []string if q, ok := querymust.(string); ok { if q != "" { wordsMust = strings.Split(q, ",") } } else if q, ok := querymust.([]string); ok { wordsMust = q } else if q, ok := querymust.([]interface{}); ok { wordsMust = objArrToStringArr(q) } if wordsMust != nil { new_minq := fmt.Sprintf(minq, "%s", findfields) musts := []string{} for _, qs_wordsMust := range wordsMust { qws := strings.Split(qs_wordsMust, "+") mq := []string{} for _, qs_word := range qws { mq = append(mq, fmt.Sprintf(new_minq, qs_word)) } musts = append(musts, fmt.Sprintf(NgramMust, strings.Join(mq, ","))) } querymust_string = strings.Join(musts, ",") } //log.Println("must", must, querymust_string) //querystring--------------------------------------------- query_string := "" var querysShold []string if q, ok := querystring.(string); ok { if q != "" { querysShold = strings.Split(q, ",") } } else if q, ok := querystring.([]string); ok { querysShold = q } else if q, ok := querystring.([]interface{}); ok { querysShold = objArrToStringArr(q) } if querysShold != nil { for k, name := range strings.Split(findfields, ",") { for _, qs_querysShold := range querysShold { if k > 0 { query_string = query_string + "," + fmt.Sprintf(MatchQueryString, fmt.Sprint(name), qs_querysShold) } else { query_string = query_string + fmt.Sprintf(MatchQueryString, fmt.Sprint(name), qs_querysShold) } } } } //log.Println("querystring", query_string) if querymust_string == "" { qstring = fmt.Sprintf(FilterQuery_New, must, query_string, querymust_string) } else { qstring = fmt.Sprintf(FilterQuery_New, must, query_string, ","+querymust_string) } return } func GetByNgram(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int) *[]map[string]interface{} { return GetByNgramAll(index, itype, query, mustquery, findfields, order, fields, start, limit, false, false) } //增加高亮、过滤查询、高亮截取字数 func GetByNgramOther(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool, count int) *[]map[string]interface{} { defer catch() qstr := "" if mustquery != "" && filtermode { qstr = GetNgramQuery(query, "", findfields) qstr = fmt.Sprintf(FilterQuery, mustquery, qstr[1:]) } else { qstr = GetNgramQuery(query, mustquery, findfields) } if qstr != "" { if highlight { ws := []string{} for _, w := range strings.Split(findfields, ",") { ws = append(ws, fmt.Sprintf(highlightStr, w, count)) } qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}` } if len(fields) > 0 { qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}" } if len(order) > 0 { qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}` } if start > -1 { qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}" } //log.Println("ngram-find", qstr) return Get(index, itype, qstr) } else { return nil } } //增加高亮、过滤查询 //替换了"号 func GetByNgramAll(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool) *[]map[string]interface{} { defer catch() qstr := "" if mustquery != "" && filtermode { qstr = GetNgramQuery(query, "", findfields) qstr = fmt.Sprintf(FilterQuery, mustquery, qstr[1:]) } else { qstr = GetNgramQuery(query, mustquery, findfields) } if qstr != "" { if highlight { ws := []string{} for _, w := range strings.Split(findfields, ",") { ws = append(ws, fmt.Sprintf(highlightStr, w, 1)) } qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}` } if len(fields) > 0 { qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}" } if strings.HasPrefix(order, "CUSTOM_") { qstr = qstr[:len(qstr)-1] + `,` + strings.TrimLeft(order, "CUSTOM_") + `}` } else if len(order) > 0 { qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}` } if start > -1 { qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}" } //log.Println("ngram-find", qstr) return Get(index, itype, qstr) } else { return nil } } //增加高亮、过滤查询 func GetByNgramAll_New(index, itype string, querystring, querymust interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool) *[]map[string]interface{} { defer catch() qstr := "" if filtermode { qstr = GetNgramQuery_New(querystring, querymust, mustquery, findfields) } else { qstr = GetNgramQuery_New(querystring, "", mustquery, findfields) } if qstr != "" { if highlight { ws := []string{} for _, w := range strings.Split(findfields, ",") { ws = append(ws, w+`:{"force_source": true}`) } qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL_New, strings.Join(ws, ",")) + `}` } if len(fields) > 0 { qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}" } if len(order) > 0 { qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", ",", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}` } if start > -1 { qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}" } //log.Println("ngram-find", order, qstr) return Get(index, itype, qstr) } else { return nil } } type KeyConfig struct { Keys []string `json:"key"` NotKeys []string `json:"notkey"` InfoTypes []string `json:"infotype"` Areas []string `json:"area"` } //替换了"号 func GetResForJY(index, itype string, keys []KeyConfig, allquery, findfields, SortQuery, fields string, start, limit int) *[]map[string]interface{} { if len(keys) > 0 { qstr := "" new_minq := fmt.Sprintf(minq, "%s", findfields) not_new_minq := fmt.Sprintf(minq, "%s", findfields) //排除词只查询标题 musts := []string{} for _, qs_words := range keys { mq := []string{} notmq := []string{} for _, qs_word := range qs_words.Keys { mq = append(mq, fmt.Sprintf(new_minq, ReplaceYH(qs_word))) /* qs := AnalyzerWord("bidding", qs_word) for _, qw := range qs { mq = append(mq, fmt.Sprintf(new_minq, ReplaceYH(qw))) } */ } for _, qs_word := range qs_words.NotKeys { notmq = append(notmq, fmt.Sprintf(not_new_minq, ReplaceYH(qs_word))) } if len(qs_words.Areas) > 0 { mq = append(mq, fmt.Sprintf(`{"terms":{"area":["%s"]}}`, strings.Join(qs_words.Areas, `","`))) } if len(qs_words.InfoTypes) > 0 { mq = append(mq, fmt.Sprintf(`{"terms":{"toptype":["%s"]}}`, strings.Join(qs_words.InfoTypes, `","`))) } musts = append(musts, fmt.Sprintf(NgramMustAndNot, strings.Join(mq, ","), strings.Join(notmq, ","))) } qstr = fmt.Sprintf(NgramStr, "", strings.Join(musts, ",")) qstr = fmt.Sprintf(FilterQuery, allquery, qstr[1:]) ws := []string{} for _, w := range strings.Split(findfields, ",") { ws = append(ws, fmt.Sprintf(highlightStr, w, 1)) } qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}` if len(fields) > 0 { qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}" } if len(SortQuery) > 0 { qstr = qstr[:len(qstr)-1] + `,"sort":` + SortQuery + `}` } if start > -1 { qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}" } //log.Println("jy-ngram-find", qstr) return Get(index, itype, qstr) } else { return nil } } func ReplaceYH(src string) (rpl string) { return strings.Replace(src, `"`, `\"`, -1) } // func GetAllByNgramNew(index, itype, qstr, findfields, order, fields string, start, limit, count int, highlight bool) (*[]map[string]interface{}, int64) { if qstr != "" { if highlight { ws := []string{} for _, w := range strings.Split(findfields, ",") { ws = append(ws, fmt.Sprintf(highlightStr, w, count)) } qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}` } if len(fields) > 0 { qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}" } if len(order) > 0 { qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}` } if start > -1 { qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}" } //log.Println("GetAllByNgram:", qstr) return GetNew(index, itype, qstr) } else { return nil, 0 } } // func GetAllByNgram(index, itype, qstr, findfields, order, fields string, start, limit, count int, highlight bool) *[]map[string]interface{} { if qstr != "" { if highlight { ws := []string{} for _, w := range strings.Split(findfields, ",") { ws = append(ws, fmt.Sprintf(highlightStr, w, count)) } qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}` } if len(fields) > 0 { qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}" } if len(order) > 0 { qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}` } if start > -1 { qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}" } //log.Println("GetAllByNgram:", qstr) return Get(index, itype, qstr) } else { return nil } } //数据标记2019-07-10 func GetAllByNgram_MP(index, itype, qstr, findfields, order, fields string, start, limit, count int, highlight bool) *[]map[string]interface{} { if qstr != "" { if highlight { ws := []string{} for _, w := range strings.Split(findfields, ",") { ws = append(ws, fmt.Sprintf(highlightStr, w, count)) } qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL_MP, strings.Join(ws, ",")) + `}` } if len(fields) > 0 { qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}" } if len(order) > 0 { qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}` } if start > -1 { qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}" } // log.Println("GetAllByNgram:", qstr) return Get(index, itype, qstr) } else { return nil } } //ik 分词 func GetAllByIk(index, itype, qstr, findfields, order, fields string, start, limit, count int, highlight bool) *[]map[string]interface{} { if qstr != "" { if highlight { ws := []string{} for _, w := range strings.Split(findfields, ",") { ws = append(ws, fmt.Sprintf(ik_highlightStr, w, count)) } qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL_IK, strings.Join(ws, ",")) + `}` } if len(fields) > 0 { qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}" } if len(order) > 0 { qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}` } if start > -1 { qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}" } //log.Println("GetAllByNgram:", qstr) return Get(index, itype, qstr) } else { return nil } } //分词 func AnalyzerWord(index, word string) (result []string) { client := GetEsConn() defer DestoryEsConn(client) result = []string{} p := url.Values{} p["text"] = []string{word} p["analyzer"] = []string{"ik"} by, err := client.PerformRequest("GET", "/"+index+"/_analyze", p, nil) if err != nil { log.Println("AnalyzerWord Error:", err) return } b, err := by.Body.MarshalJSON() if err != nil { log.Println("AnalyzerWord MarshalJSON Error:", err) return } var res map[string][]map[string]interface{} err = json.Unmarshal(b, &res) if err != nil { log.Println("AnalyzerWord Unmarshal Error:", err) return } if res == nil { return } for _, v := range res["tokens"] { token, _ := v["token"].(string) if token != "" { result = append(result, token) } } return } func GetResForJYView(index, itype string, keys []KeyConfig, allquery, findfields, SortQuery, fields string, start, limit int) *[]map[string]interface{} { if len(keys) > 0 { qstr := "" new_minq := fmt.Sprintf(minq, "%s", findfields) not_new_minq := fmt.Sprintf(minq, "%s", findfields) //排除词只查询标题 musts := []string{} for _, qs_words := range keys { mq := []string{} notmq := []string{} for _, qs_word := range qs_words.Keys { mq = append(mq, fmt.Sprintf(new_minq, ReplaceYH(qs_word))) } for _, qs_word := range qs_words.NotKeys { notmq = append(notmq, fmt.Sprintf(not_new_minq, ReplaceYH(qs_word))) } if len(qs_words.Areas) > 0 { mq = append(mq, fmt.Sprintf(`{"terms":{"area":["%s"]}}`, strings.Join(qs_words.Areas, `","`))) } if len(qs_words.InfoTypes) > 0 { mq = append(mq, fmt.Sprintf(`{"terms":{"toptype":["%s"]}}`, strings.Join(qs_words.InfoTypes, `","`))) } musts = append(musts, fmt.Sprintf(NgramMustAndNot, strings.Join(mq, ","), strings.Join(notmq, ","))) } qstr = fmt.Sprintf(NgramStr, "", strings.Join(musts, ",")) qstr = fmt.Sprintf(FilterQuery, allquery, qstr[1:]) ws := []string{} for _, w := range strings.Split(findfields, ",") { ws = append(ws, fmt.Sprintf(highlightStr, w, 1)) } qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}` if len(fields) > 0 { qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}" } if len(SortQuery) > 0 { qstr = qstr[:len(qstr)-1] + `,"sort":` + SortQuery + `}` } if start > -1 { qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}" } return Get(index, itype, qstr) } else { return nil } } //返回count 和 res func GetWithCount(index, itype, query string) (int64, *[]map[string]interface{}) { client := GetEsConn() defer func() { go DestoryEsConn(client) }() var res []map[string]interface{} var count int64 = 0 if client != nil { defer func() { if r := recover(); r != nil { log.Println("[E]", r) for skip := 1; ; skip++ { _, file, line, ok := runtime.Caller(skip) if !ok { break } go log.Printf("%v,%v\n", file, line) } } }() searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do() if err != nil { log.Println("从ES查询出错", err.Error()) return count, nil } if searchResult.Hits != nil { resNum := len(searchResult.Hits.Hits) count = searchResult.Hits.TotalHits if resNum > 5000 { log.Println("查询结果太多,查询到:", resNum, "条") } else { res = make([]map[string]interface{}, resNum) for i, hit := range searchResult.Hits.Hits { parseErr := json.Unmarshal(*hit.Source, &res[i]) if parseErr == nil && hit.Highlight != nil && res[i] != nil { res[i]["highlight"] = map[string][]string(hit.Highlight) } } } } } return count, &res }