elasticutil.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999
  1. package elastic
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. es "github.com/olivere/elastic/v7"
  8. "log"
  9. "net/url"
  10. "reflect"
  11. "runtime"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. // 检索库服务地址
  18. var addrs []string
  19. var LocCity = map[string]string{}
  20. var SIZE = 30
  21. const (
  22. QStr = `{"query":{"bool":{"must":[$and],"must_not":[],
  23. "should":[$or],"minimum_should_match" : 1}}}`
  24. )
  25. var pool chan *es.Client
  26. var ntimeout int
  27. var syncPool sync.Pool
  28. //自定义HttpClient
  29. /**
  30. var httpclient = &http.Client{Transport: &http.Transport{
  31. Dial: func(netw, addr string) (net.Conn, error) {
  32. deadline := time.Now().Add(5000 * time.Millisecond)
  33. c, err := net.DialTimeout(netw, addr, 10000*time.Millisecond)
  34. if err != nil {
  35. return nil, err
  36. }
  37. tcp_conn := c.(*net.TCPConn)
  38. tcp_conn.SetKeepAlive(false)
  39. tcp_conn.SetDeadline(deadline)
  40. return tcp_conn, nil
  41. },
  42. DisableKeepAlives: true, //不保持,这样才能释放
  43. }}
  44. **/
  45. //var op = es.SetHttpClient(httpclient)
  46. var poolsize = int32(20)
  47. // n倍的池
  48. func InitElasticSize(addr string, size int) {
  49. poolsize = int32(3 * size)
  50. pool = make(chan *es.Client, poolsize)
  51. for _, s := range strings.Split(addr, ",") {
  52. addrs = append(addrs, s)
  53. }
  54. for i := 0; i < size; i++ {
  55. client, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  56. pool <- client
  57. }
  58. }
  59. // 关闭连接
  60. func DestoryEsConn(client *es.Client) {
  61. select {
  62. case pool <- client:
  63. break
  64. case <-time.After(time.Second * 1):
  65. if client != nil {
  66. client.Stop()
  67. }
  68. client = nil
  69. }
  70. }
  71. var (
  72. lastTime = int64(0)
  73. lastTimeLock = &sync.Mutex{}
  74. )
  75. //获取连接
  76. func GetEsConn() *es.Client {
  77. select {
  78. case c := <-pool:
  79. if c == nil || !c.IsRunning() {
  80. log.Println("new esclient.", len(pool))
  81. client, err := es.NewClient(es.SetURL(addrs...),
  82. es.SetMaxRetries(2), es.SetSniff(false))
  83. if err == nil && client.IsRunning() {
  84. return client
  85. }
  86. }
  87. return c
  88. case <-time.After(time.Second * 4):
  89. //超时
  90. ntimeout++
  91. lastTimeLock.Lock()
  92. defer lastTimeLock.Unlock()
  93. //12秒后允许创建链接
  94. c := time.Now().Unix() - lastTime
  95. if c > 12 {
  96. lastTime = time.Now().Unix()
  97. log.Println("add client..", len(pool))
  98. c, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  99. go func() {
  100. for i := 0; i < 2; i++ {
  101. client, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  102. pool <- client
  103. }
  104. }()
  105. return c
  106. }
  107. return nil
  108. }
  109. }
  110. // 通用查询
  111. // {"query": {"bool":{"must":[{"query_string":{"default_field":"name","query":"*"}}]}}}
  112. // {"query":{"bool":{"must":{"match":{"content":{"query":"fulltextsearch","operator":"and"}}},"should":[{"match":{"content":{"query":"Elasticsearch","boost":3}}},{"match":{"content":{"query":"Lucene","boost":2}}}]}}}
  113. // prefix
  114. // {"query":{"match":{"title":{"query":"brownfox","operator":"and"}}}} //默认为or
  115. // {"query":{"multi_match":{"query":"PolandStreetW1V","type":"most_fields","fields":["*_street","city^2","country","postcode"]}}}
  116. // {"query":{"wildcard":{"postcode":"W?F*HW"}}}
  117. // {"query":{"regexp":{"postcode":"W[0-9].+"}}}
  118. // {"query":{"filtered":{"filter":{"range":{"price":{"gte":10000}}}}},"aggs":{"single_avg_price":{"avg":{"field":"price"}}}}
  119. // {"query":{"match":{"make":"ford"}},"aggs":{"colors":{"terms":{"field":"color"}}}}//查fork有几种颜色
  120. // 过滤器不会计算相关度的得分,所以它们在计算上更快一些
  121. // {"query":{"filtered":{"query":{"match_all":{}},"filter":{"range":{"balance":{"gte":20000,"lte":30000}}}}}}
  122. // {"query":{"match_all":{}},"from":10,"size":10,"_source":["account_number","balance"],"sort":{"balance":{"order":"desc"}}}
  123. // {"query":{"match_phrase":{"address":"milllane"}}}和match不同会去匹配整个短语,相当于must[]
  124. func Get(index string, query es.Query) *[]map[string]interface{} {
  125. log.Println("query -- ", query)
  126. client := GetEsConn()
  127. defer func() {
  128. go DestoryEsConn(client)
  129. }()
  130. var res []map[string]interface{}
  131. if client != nil {
  132. defer func() {
  133. if r := recover(); r != nil {
  134. log.Println("[E]", r)
  135. for skip := 1; ; skip++ {
  136. _, file, line, ok := runtime.Caller(skip)
  137. if !ok {
  138. break
  139. }
  140. go log.Printf("%v,%v\n", file, line)
  141. }
  142. }
  143. }()
  144. searchResult, err := client.Search().Index(index).Query(query).Do(context.Background())
  145. if err != nil {
  146. log.Println("从ES查询出错", err.Error())
  147. return nil
  148. }
  149. if searchResult.Hits != nil {
  150. resNum := len(searchResult.Hits.Hits)
  151. if resNum < 5000 {
  152. res = make([]map[string]interface{}, resNum)
  153. for i, hit := range searchResult.Hits.Hits {
  154. b, _ := hit.Source.MarshalJSON()
  155. d := json.NewDecoder(bytes.NewBuffer(b))
  156. d.UseNumber()
  157. parseErr := d.Decode(&res[i])
  158. //parseErr := json.Unmarshal(*hit.Source, &res[i])
  159. if parseErr == nil && hit.Highlight != nil && res[i] != nil {
  160. res[i]["highlight"] = map[string][]string(hit.Highlight)
  161. }
  162. }
  163. } else {
  164. log.Println("查询结果太多,查询到:", resNum, "条")
  165. }
  166. }
  167. }
  168. return &res
  169. }
  170. func GetOA(index, itype, query string) (*[]map[string]interface{}, int) {
  171. //log.Println("query -- ", query)
  172. client := GetEsConn()
  173. defer func() {
  174. go DestoryEsConn(client)
  175. }()
  176. var res []map[string]interface{}
  177. var resNum int
  178. if client != nil {
  179. defer func() {
  180. if r := recover(); r != nil {
  181. log.Println("[E]", r)
  182. for skip := 1; ; skip++ {
  183. _, file, line, ok := runtime.Caller(skip)
  184. if !ok {
  185. break
  186. }
  187. go log.Printf("%v,%v\n", file, line)
  188. }
  189. }
  190. }()
  191. searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
  192. if err != nil {
  193. log.Println("从ES查询出错", err.Error())
  194. return nil, 0
  195. }
  196. if searchResult.Hits != nil {
  197. resNum = len(searchResult.Hits.Hits)
  198. if resNum < 5000 {
  199. res = make([]map[string]interface{}, resNum)
  200. for i, hit := range searchResult.Hits.Hits {
  201. b, _ := hit.Source.MarshalJSON()
  202. d := json.NewDecoder(bytes.NewBuffer(b))
  203. d.UseNumber()
  204. parseErr := d.Decode(&res[i])
  205. //parseErr := json.Unmarshal(*hit.Source, &res[i])
  206. if parseErr == nil && hit.Highlight != nil && res[i] != nil {
  207. res[i]["highlight"] = map[string][]string(hit.Highlight)
  208. }
  209. }
  210. } else {
  211. log.Println("查询结果太多,查询到:", resNum, "条")
  212. }
  213. }
  214. }
  215. return &res, resNum
  216. }
  217. func GetNoLimit(index, itype, query string) *[]map[string]interface{} {
  218. //log.Println("query -- ", query)
  219. client := GetEsConn()
  220. defer DestoryEsConn(client)
  221. var res []map[string]interface{}
  222. if client != nil {
  223. defer func() {
  224. if r := recover(); r != nil {
  225. log.Println("[E]", r)
  226. for skip := 1; ; skip++ {
  227. _, file, line, ok := runtime.Caller(skip)
  228. if !ok {
  229. break
  230. }
  231. go log.Printf("%v,%v\n", file, line)
  232. }
  233. }
  234. }()
  235. searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
  236. if err != nil {
  237. log.Println("从ES查询出错", err.Error())
  238. return nil
  239. }
  240. if searchResult.Hits != nil {
  241. resNum := len(searchResult.Hits.Hits)
  242. res = make([]map[string]interface{}, resNum)
  243. for i, hit := range searchResult.Hits.Hits {
  244. b, _ := hit.Source.MarshalJSON()
  245. d := json.NewDecoder(bytes.NewBuffer(b))
  246. d.Decode(&res[i])
  247. //json.Unmarshal(*hit.Source, &res[i])
  248. }
  249. }
  250. }
  251. return &res
  252. }
  253. // openapi
  254. func GetOAPage(index, itype, query, order, field string, start, limit int) (*[]map[string]interface{}, int) {
  255. return GetOA(index, itype, MakeQuery(query, order, field, start, limit))
  256. }
  257. var SR = strings.Replace
  258. func MakeQuery(query, order, fileds string, start, limit int) string {
  259. log.Println(query)
  260. res := AnalyQuery(query, "", QStr)
  261. log.Println(len(res), query)
  262. if len(res) > 10 {
  263. res = SR(SR(SR(SR(res, ",$and", "", -1), "$and", "", -1), ",$or", "", -1), "$or", "", -1)
  264. log.Println("1", res)
  265. if len(fileds) > 0 {
  266. //"_source":["account_number","balance"]
  267. res = res[:len(res)-1] + `,"_source":[` + fileds + "]}"
  268. }
  269. log.Println("2", res)
  270. //{"name":-1,"age":1}
  271. if len(order) > 0 {
  272. res = res[:len(res)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
  273. }
  274. log.Println("3", res)
  275. if start > -1 {
  276. res = res[:len(res)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
  277. }
  278. log.Println("4", res)
  279. return res
  280. }
  281. return ""
  282. }
  283. // {"name":"aaa"}
  284. func AnalyQuery(query interface{}, parent string, result string) string {
  285. m := make(map[string]interface{})
  286. if q1, ok := query.(string); ok {
  287. json.Unmarshal([]byte(q1), &m)
  288. } else if q2, ok2 := query.(map[string]interface{}); ok2 {
  289. m = q2
  290. }
  291. if len(parent) == 0 {
  292. for k, v := range m {
  293. if k == "$and" || k == "$or" {
  294. temps := ""
  295. if map1, ok := v.([]interface{}); ok {
  296. for i := 0; i < len(map1); i++ {
  297. temps += "," + AnalyQuery(map1[i], k, "")
  298. }
  299. }
  300. if len(temps) > 0 {
  301. temps = temps[1:]
  302. }
  303. result = SR(result, k, temps+","+k, 1)
  304. } else {
  305. switch reflect.TypeOf(v).String() {
  306. case "string":
  307. if strings.Index(k, "TERM_") == 0 {
  308. result = SR(result, "$and", `{"term":{"`+SR(k, "TERM_", "", 1)+`":"`+fmt.Sprintf("%v", v)+`"}},$and`, 1)
  309. } else {
  310. result = SR(result, "$and", `{"query_string":{"default_field":"`+k+`","query":"`+fmt.Sprintf("%v", v)+`"}},$and`, 1)
  311. }
  312. case "int", "int8", "int32", "int64", "float32", "float64":
  313. if strings.Index(k, "TERM_") == 0 {
  314. result = SR(result, "$and", `{"term":{"`+SR(k, "TERM_", "", 1)+`":`+fmt.Sprintf("%v", v)+`}},$and`, 1)
  315. } else {
  316. result = SR(result, "$and", `{"query_string":{"default_field":"`+k+`","query":`+fmt.Sprintf("%v", v)+`}},$and`, 1)
  317. }
  318. default:
  319. result = SR(result, "$and", AnalyQuery(v, k, "")+",$and", 1)
  320. }
  321. }
  322. }
  323. return result
  324. } else {
  325. for k, v := range m {
  326. if k == "$in" {
  327. s := ""
  328. if map1, ok := v.([]interface{}); ok {
  329. for i := 0; i < len(map1); i++ {
  330. s += "," + `"` + fmt.Sprintf("%v", map1[i]) + `"`
  331. }
  332. }
  333. if len(s) > 0 {
  334. s = s[1:]
  335. }
  336. return `{"terms":{"` + parent + `":[` + s + `]}}`
  337. } else if strings.Contains(k, "$lt") || strings.Contains(k, "$gt") {
  338. return `{"range":{"` + parent + `":{"` + SR(k, "$", "", 1) + `":` + fmt.Sprintf("%v", v) + `}}}`
  339. } else {
  340. switch reflect.TypeOf(v).String() {
  341. case "string":
  342. if strings.Index(k, "TERM_") == 0 {
  343. return `{"term":{"` + SR(k, "TERM_", "", 1) + `":"` + fmt.Sprintf("%v", v) + `"}}`
  344. } else {
  345. return `{"query_string":{"default_field":"` + k + `","query":"` + fmt.Sprintf("%v", v) + `"}}`
  346. }
  347. case "int", "int8", "int32", "int64", "float32", "float64":
  348. if strings.Index(k, "TERM_") == 0 {
  349. return `{"term":{"` + SR(k, "TERM_", "", 1) + `":` + fmt.Sprintf("%v", v) + `}}`
  350. } else {
  351. return `{"query_string":{"default_field":"` + k + `","query":` + fmt.Sprintf("%v", v) + `}}`
  352. }
  353. default:
  354. return AnalyQuery(v, k, result)
  355. }
  356. }
  357. }
  358. }
  359. return result
  360. }
  361. func GetByIdField(index, itype, id, fields string) *map[string]interface{} {
  362. client := GetEsConn()
  363. defer DestoryEsConn(client)
  364. if client != nil {
  365. defer func() {
  366. if r := recover(); r != nil {
  367. log.Println("[E]", r)
  368. for skip := 1; ; skip++ {
  369. _, file, line, ok := runtime.Caller(skip)
  370. if !ok {
  371. break
  372. }
  373. go log.Printf("%v,%v\n", file, line)
  374. }
  375. }
  376. }()
  377. query := `{"query":{"term":{"_id":"` + id + `"}}`
  378. if len(fields) > 0 {
  379. query = query + `,"_source":[` + fields + `]`
  380. }
  381. query = query + "}"
  382. searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
  383. if err != nil {
  384. log.Println("从ES查询出错", err.Error())
  385. return nil
  386. }
  387. var res map[string]interface{}
  388. if searchResult.Hits != nil {
  389. resNum := len(searchResult.Hits.Hits)
  390. if resNum == 1 {
  391. res = make(map[string]interface{})
  392. for _, hit := range searchResult.Hits.Hits {
  393. b, _ := hit.Source.MarshalJSON()
  394. d := json.NewDecoder(bytes.NewBuffer(b))
  395. d.UseNumber()
  396. d.Decode(&res)
  397. //json.Unmarshal(*hit.Source, &res)
  398. }
  399. return &res
  400. }
  401. }
  402. }
  403. return nil
  404. }
  405. //根据id来查询文档
  406. //func GetById(index, itype string, ids ...string) *[]map[string]interface{} {
  407. // client := GetEsConn()
  408. // defer DestoryEsConn(client)
  409. // var res []map[string]interface{}
  410. // if client != nil {
  411. // defer func() {
  412. // if r := recover(); r != nil {
  413. // log.Println("[E]", r)
  414. // for skip := 1; ; skip++ {
  415. // _, file, line, ok := runtime.Caller(skip)
  416. // if !ok {
  417. // break
  418. // }
  419. // go log.Printf("%v,%v\n", file, line)
  420. // }
  421. // }
  422. // }()
  423. // query := es.NewIdsQuery().Ids(ids...)
  424. // searchResult, err := client.Search().Index(index).Query(&query).Do(context.Background())
  425. // if err != nil {
  426. // log.Println("从ES查询出错", err.Error())
  427. // return nil
  428. // }
  429. //
  430. // if searchResult.Hits != nil {
  431. // resNum := len(searchResult.Hits.Hits)
  432. // if resNum < 5000 {
  433. // res = make([]map[string]interface{}, resNum)
  434. // for i, hit := range searchResult.Hits.Hits {
  435. // b, _ := hit.Source.MarshalJSON()
  436. // d := json.NewDecoder(bytes.NewBuffer(b))
  437. // d.Decode(&res[i])
  438. // //json.Unmarshal(*hit.Source, &res[i])
  439. // }
  440. // } else {
  441. // log.Println("查询结果太多,查询到:", resNum, "条")
  442. // }
  443. //
  444. // }
  445. // }
  446. // return &res
  447. //}
  448. // 删除某个索引,根据查询
  449. func Del(index, itype string, query interface{}) bool {
  450. client := GetEsConn()
  451. defer DestoryEsConn(client)
  452. b := false
  453. if client != nil {
  454. defer func() {
  455. if r := recover(); r != nil {
  456. log.Println("[E]", r)
  457. for skip := 1; ; skip++ {
  458. _, file, line, ok := runtime.Caller(skip)
  459. if !ok {
  460. break
  461. }
  462. go log.Printf("%v,%v\n", file, line)
  463. }
  464. }
  465. }()
  466. var err error
  467. if qi, ok2 := query.(es.Query); ok2 {
  468. _, err = client.DeleteByQuery().Index(index).Query(qi).Do(context.Background())
  469. }
  470. if err != nil {
  471. log.Println("删除索引出错:", err.Error())
  472. } else {
  473. b = true
  474. }
  475. }
  476. return b
  477. }
  478. //根据语句更新对象
  479. //func Update(index, itype, id string, updateStr string) bool {
  480. // client := GetEsConn()
  481. // defer DestoryEsConn(client)
  482. // b := false
  483. // if client != nil {
  484. // defer func() {
  485. // if r := recover(); r != nil {
  486. // log.Println("[E]", r)
  487. // for skip := 1; ; skip++ {
  488. // _, file, line, ok := runtime.Caller(skip)
  489. // if !ok {
  490. // break
  491. // }
  492. // go log.Printf("%v,%v\n", file, line)
  493. // }
  494. // }
  495. // }()
  496. // var err error
  497. // _, err = client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
  498. // if err != nil {
  499. // log.Println("更新检索出错:", err.Error())
  500. // } else {
  501. // b = true
  502. // }
  503. // }
  504. // return b
  505. //}
  506. //
  507. //func BulkUpdate(index, itype string, ids []string, updateStr string) {
  508. // client := GetEsConn()
  509. // defer DestoryEsConn(client)
  510. // if client != nil {
  511. // defer func() {
  512. // if r := recover(); r != nil {
  513. // log.Println("[E]", r)
  514. // for skip := 1; ; skip++ {
  515. // _, file, line, ok := runtime.Caller(skip)
  516. // if !ok {
  517. // break
  518. // }
  519. // go log.Printf("%v,%v\n", file, line)
  520. // }
  521. // }
  522. // }()
  523. // for _, id := range ids {
  524. // _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
  525. // if err != nil {
  526. // log.Println("更新检索出错:", err.Error())
  527. // }
  528. // }
  529. // }
  530. //}
  531. //
  532. //func BulkUpdateArr(index, itype string, update []map[string]string) {
  533. // client := GetEsConn()
  534. // defer DestoryEsConn(client)
  535. // if client != nil {
  536. // defer func() {
  537. // if r := recover(); r != nil {
  538. // log.Println("[E]", r)
  539. // for skip := 1; ; skip++ {
  540. // _, file, line, ok := runtime.Caller(skip)
  541. // if !ok {
  542. // break
  543. // }
  544. // go log.Printf("%v,%v\n", file, line)
  545. // }
  546. // }
  547. // }()
  548. // for _, data := range update {
  549. // id := data["id"]
  550. // updateStr := data["updateStr"]
  551. // if id != "" && updateStr != "" {
  552. // _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
  553. // if err != nil {
  554. // log.Println("更新检索出错:", err.Error())
  555. // }
  556. // } else {
  557. // log.Println("数据错误")
  558. // }
  559. // }
  560. //
  561. // }
  562. //}
  563. // 根据id删除索引对象
  564. func DelById(index, itype, id string) bool {
  565. client := GetEsConn()
  566. defer DestoryEsConn(client)
  567. b := false
  568. if client != nil {
  569. defer func() {
  570. if r := recover(); r != nil {
  571. log.Println("[E]", r)
  572. for skip := 1; ; skip++ {
  573. _, file, line, ok := runtime.Caller(skip)
  574. if !ok {
  575. break
  576. }
  577. go log.Printf("%v,%v\n", file, line)
  578. }
  579. }
  580. }()
  581. var err error
  582. _, err = client.Delete().Index(index).Id(id).Do(context.Background())
  583. if err != nil {
  584. log.Println("更新检索出错:", err.Error())
  585. } else {
  586. b = true
  587. }
  588. }
  589. return b
  590. }
  591. // 把地市代码转为地市
  592. func getLoc(code string, res *map[string]string) (loc string) {
  593. switch len(code) {
  594. case 6:
  595. loc = (*res)[code[:2]] + " " + (*res)[code[:4]] + " " + (*res)[code]
  596. break
  597. case 4:
  598. loc = (*res)[code[:2]] + " " + (*res)[code]
  599. break
  600. case 2:
  601. loc = (*res)[code]
  602. break
  603. }
  604. return
  605. }
  606. // 把地市代码转为地市
  607. func Loop(m interface{}, res *map[string]string) {
  608. m1, ok := m.([]interface{})
  609. if !ok {
  610. m2, _ := m.([]map[string]interface{})
  611. for i := 0; i < len(m2); i++ {
  612. ms := m2[i]
  613. (*res)[fmt.Sprintf("%1.0f", ms["k"])] = fmt.Sprintf("%s", ms["n"])
  614. s := ms["s"]
  615. if nil != s {
  616. mss, _ := s.([]interface{})
  617. if nil != mss {
  618. Loop(mss, res)
  619. }
  620. }
  621. }
  622. } else {
  623. for i := 0; i < len(m1); i++ {
  624. ms, _ := m1[i].(map[string]interface{})
  625. (*res)[fmt.Sprintf("%1.0f", ms["k"])] = fmt.Sprintf("%s", ms["n"])
  626. s := ms["s"]
  627. if nil != s {
  628. mss, _ := s.([]interface{})
  629. if nil != mss {
  630. Loop(mss, res)
  631. }
  632. }
  633. }
  634. }
  635. }
  636. func ConverData(ent *map[string]interface{}) map[string]interface{} {
  637. tmp := *ent
  638. id64, _ := tmp["ID"].(int64)
  639. ids := fmt.Sprintf("%d", id64)
  640. tmp2 := make(map[string]interface{})
  641. tmp2["ID"] = ids
  642. tmp2["_id"] = tmp["_id"]
  643. tmp2["Area"] = tmp["Area"]
  644. tmp2["LeRep"] = tmp["LeRep"]
  645. tmp2["RegNo"] = tmp["RegNo"]
  646. tmp2["EntType"] = tmp["EntType"]
  647. tmp2["EntName"] = tmp["EntName"]
  648. tmp2["EntTypeName"] = tmp["EntTypeName"]
  649. tmp2["Dom"] = tmp["Dom"]
  650. tmp2["EstDate"] = tmp["EstDate"]
  651. tmp2["OpStateName"] = tmp["OpStateName"]
  652. tmp2["OpScope"] = tmp["OpScope"]
  653. tmp2["OpState"] = tmp["OpState"]
  654. tmp2["s_submitid"] = tmp["s_submitid"]
  655. tmp2["l_submittime"] = tmp["l_submittime"]
  656. tmp2["s_submitname"] = tmp["s_submitname"]
  657. tmp2["RegCapCurName"] = tmp["RegCapCurName"]
  658. //增加营业状态排序
  659. if tmp2["OpState"] == "06" {
  660. tmp2["OpSint"] = true
  661. } else {
  662. tmp2["OpSint"] = false
  663. }
  664. tmp2["OpLocDistrict"] = tmp["OpLocDistrict"]
  665. //增加代码转名称
  666. tmpLoc, _ := tmp["OpLocDistrict"].(string)
  667. tmp2["OpLocDistrictName"] = getLoc(tmpLoc, &LocCity)
  668. tmp2["RecCap"] = tmp["RecCap"]
  669. tmp2["RegCap"] = tmp["RegCap"]
  670. tmp2["IndustryPhy"] = tmp["IndustryPhy"]
  671. tmp2["IndustryPhyName"] = tmp["IndustryPhyName"]
  672. tmp2["RegOrg"] = tmp["RegOrg"]
  673. tmp2["RegOrgName"] = tmp["RegOrgName"]
  674. tmp2["Tel"] = tmp["Tel"]
  675. tmp2["CompForm"] = tmp["CompForm"]
  676. tmp2["CompFormName"] = tmp["CompFormName"]
  677. //增加异常名录标记 Ycml可能是bool也可能是string
  678. Ycmlb, _ := tmp["Ycml"].(bool)
  679. Ycmls, _ := tmp["Ycml"].(string)
  680. if Ycmlb || Ycmls == "1" {
  681. tmp2["Ycml"] = true
  682. } else {
  683. tmp2["Ycml"] = false
  684. }
  685. //增加年报联系信息
  686. if tmp["Nb_email"] != nil {
  687. tmp2["Nb_email"] = tmp["Nb_email"]
  688. }
  689. if tmp["Nb_tel"] != nil {
  690. tmp2["Nb_tel"] = tmp["Nb_tel"]
  691. }
  692. if tmp["Nb_addr"] != nil {
  693. tmp2["Nb_addr"] = tmp["Nb_addr"]
  694. }
  695. s_synopsis := tmp["s_synopsis"]
  696. if s_synopsis == nil {
  697. s_synopsis = ""
  698. }
  699. tmp2["s_synopsis"] = s_synopsis //企业简介
  700. //股东
  701. stock := getStock(tmp["investor"])
  702. tmp2["stock"] = stock
  703. tmp2["LegCerNO"] = tmp["LegCerNO"]
  704. if tmp["s_microwebsite"] != nil {
  705. tmp2["s_microwebsite"] = tmp["s_microwebsite"]
  706. }
  707. tmp2["SourceType"] = tmp["SourceType"] //数据来源
  708. s_servicenames := tmp["s_servicenames"]
  709. if s_servicenames == nil {
  710. s_servicenames = ""
  711. }
  712. tmp2["s_servicenames"] = s_servicenames //服务名称
  713. s_action := tmp["s_action"]
  714. if s_action == nil {
  715. s_action = "N"
  716. }
  717. tmp2["s_action"] = s_action
  718. tmp2["s_persion"] = tmp["s_persion"]
  719. tmp2["s_mobile"] = tmp["s_mobile"]
  720. tmp2["s_enturl"] = tmp["s_enturl"]
  721. tmp2["s_weixin"] = tmp["s_weixin"]
  722. tmp2["s_avatar"] = tmp["s_avatar"]
  723. return tmp2
  724. }
  725. func getStock(obj interface{}) string {
  726. stock := ""
  727. if ns, ok := obj.([]interface{}); ok {
  728. stock = " "
  729. for _, ns1 := range ns {
  730. if nn, ok1 := ns1.(map[string]interface{}); ok1 {
  731. tmp := fmt.Sprintf("%s", nn["Inv"])
  732. if strings.Index(stock, tmp) < 0 {
  733. stock += tmp + " "
  734. }
  735. }
  736. }
  737. }
  738. return stock
  739. }
  740. func BulkSave(index, itype string, obj *[]map[string]interface{}, isDelBefore bool) {
  741. client := GetEsConn()
  742. defer DestoryEsConn(client)
  743. if client != nil {
  744. defer func() {
  745. if r := recover(); r != nil {
  746. log.Println("[E]", r)
  747. for skip := 1; ; skip++ {
  748. _, file, line, ok := runtime.Caller(skip)
  749. if !ok {
  750. break
  751. }
  752. go log.Printf("%v,%v\n", file, line)
  753. }
  754. }
  755. }()
  756. req := client.Bulk()
  757. for _, v := range *obj {
  758. if isDelBefore {
  759. req = req.Add(es.NewBulkDeleteRequest().Index(index).Type(itype).Id(fmt.Sprintf("%v", v["_id"])))
  760. }
  761. req = req.Add(es.NewBulkIndexRequest().Index(index).Type(itype).Doc(v))
  762. }
  763. _, err := req.Do(context.Background())
  764. if err != nil {
  765. log.Println("批量保存到ES出错", err.Error())
  766. }
  767. }
  768. }
  769. func Count(index, itype string, query interface{}) int64 {
  770. client := GetEsConn()
  771. defer DestoryEsConn(client)
  772. if client != nil {
  773. defer func() {
  774. if r := recover(); r != nil {
  775. log.Println("[E]", r)
  776. for skip := 1; ; skip++ {
  777. _, file, line, ok := runtime.Caller(skip)
  778. if !ok {
  779. break
  780. }
  781. go log.Printf("%v,%v\n", file, line)
  782. }
  783. }
  784. }()
  785. var qq es.Query
  786. if qi, ok2 := query.(es.Query); ok2 {
  787. qq = qi
  788. }
  789. n, err := client.Count(index).Query(qq).Do(context.Background())
  790. if err != nil {
  791. log.Println("统计出错", err.Error())
  792. }
  793. return n
  794. }
  795. return 0
  796. }
  797. //ngram精确查询
  798. /*
  799. {
  800. "query": {
  801. "bool": {
  802. "should": [
  803. {
  804. "bool":{
  805. "must":[
  806. { "multi_match": {
  807. "query": "智能",
  808. "type": "phrase",
  809. "fields": [
  810. "title"
  811. ],
  812. "analyzer": "my_ngram"
  813. }
  814. },{
  815. "multi_match": {
  816. "query": "机器",
  817. "type": "phrase",
  818. "fields": [
  819. "title"
  820. ],
  821. "analyzer": "my_ngram"
  822. }
  823. },{
  824. "multi_match": {
  825. "query": "2016",
  826. "type": "phrase",
  827. "fields": [
  828. "title"
  829. ],
  830. "analyzer": "my_ngram"
  831. }
  832. }
  833. ]
  834. }
  835. },
  836. {
  837. "bool":{
  838. "must":[
  839. { "multi_match": {
  840. "query": "河南",
  841. "type": "phrase",
  842. "fields": [
  843. "title"
  844. ],
  845. "analyzer": "my_ngram"
  846. }
  847. },{
  848. "multi_match": {
  849. "query": "工商",
  850. "type": "phrase",
  851. "fields": [
  852. "title"
  853. ],
  854. "analyzer": "my_ngram"
  855. }
  856. },{
  857. "multi_match": {
  858. "query": "2016",
  859. "type": "phrase",
  860. "fields": [
  861. "title"
  862. ],
  863. "analyzer": "my_ngram"
  864. }
  865. }
  866. ]
  867. }
  868. }
  869. ],"minimum_should_match": 1
  870. }
  871. },
  872. "_source": [
  873. "_id",
  874. "title"
  875. ],
  876. "from": 0,
  877. "size": 10,
  878. "sort": [{
  879. "publishtime": "desc"
  880. }]
  881. }
  882. */
  883. //"2016+智能+办公,"河南+工商"
  884. //["2016+智能+办公","河南+工商"]
  885. //QStr = `{"query":{"bool":{should":[$or],"minimum_should_match" : 1}}}`
  886. //{"bool":{"must":[]}}
  887. //{"multi_match": {"query": "$word","type": "phrase", "fields": [$field],"analyzer": "my_ngram"}}
  888. //"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {"detail": {"fragment_size": 1,"number_of_fragments": 1},"title": {"fragment_size": 1,"number_of_fragments": 1}}}
  889. const (
  890. //此处最后少一个},正好NgramStr取[1:]多一个}
  891. FilterQuery = `{"query": {"filtered": {"filter": {"bool": {"must": [%s]}},%s}}`
  892. NgramStr = `{"query":{"bool":{"must":[%s],"should":[%s],"minimum_should_match": 1}}}`
  893. NgramMust = `{"bool":{"must":[%s]}}`
  894. NgramMustAndNot = `{"bool":{"must":[%s],"must_not":[%s]}}`
  895. minq = `{"multi_match": {"query": "%s","type": "phrase", "fields": [%s]}}`
  896. HL = `"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {%s}}`
  897. highlightStr = `%s: {"fragment_size": %d,"number_of_fragments": 1}`
  898. FilterQuery_New = `{"query":{"bool":{"must": [%s%s%s],"should":[]}}}`
  899. MatchQueryString = `{"match": {%s: { "query":"%s", "operator": "and"}}}`
  900. HL_New = `"highlight": {"pre_tags": ["<HL>"],"post_tags": ["<HL>"],"fields": {%s}}`
  901. //数据查询高亮标记2019-07-10
  902. HL_MP = `"highlight": {"pre_tags": ["<HL>"],"post_tags": ["</HL>"],"fields": {%s}}`
  903. ik_highlightStr = `%s: {"fragment_size": %d,"number_of_fragments": 1,"require_field_match": true}`
  904. IK_pre_tags = `<font class=\"es-highlight\">`
  905. IK_post_tags = `</font>`
  906. HL_IK = `"highlight": {"pre_tags": ["` + IK_pre_tags + `"],"post_tags": ["` + IK_post_tags + `"],"fields": {%s}}`
  907. )
  908. type KeyConfig struct {
  909. Keys []string `json:"key"`
  910. NotKeys []string `json:"notkey"`
  911. InfoTypes []string `json:"infotype"`
  912. Areas []string `json:"area"`
  913. }
  914. // 分词
  915. func AnalyzerWord(index, word string) (result []string) {
  916. client := GetEsConn()
  917. defer DestoryEsConn(client)
  918. result = []string{}
  919. p := url.Values{}
  920. p["text"] = []string{word}
  921. p["analyzer"] = []string{"ik"}
  922. opt := es.PerformRequestOptions{
  923. Method: "GET",
  924. Path: "/" + index + "/_analyze",
  925. Body: p,
  926. }
  927. by, err := client.PerformRequest(context.Background(), opt)
  928. if err != nil {
  929. log.Println("AnalyzerWord Error:", err)
  930. return
  931. }
  932. b, err := by.Body.MarshalJSON()
  933. if err != nil {
  934. log.Println("AnalyzerWord MarshalJSON Error:", err)
  935. return
  936. }
  937. var res map[string][]map[string]interface{}
  938. err = json.Unmarshal(b, &res)
  939. if err != nil {
  940. log.Println("AnalyzerWord Unmarshal Error:", err)
  941. return
  942. }
  943. if res == nil {
  944. return
  945. }
  946. for _, v := range res["tokens"] {
  947. token, _ := v["token"].(string)
  948. if token != "" {
  949. result = append(result, token)
  950. }
  951. }
  952. return
  953. }