elasticutil.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113
  1. package elastic
  2. import (
  3. "os"
  4. "runtime"
  5. "sync"
  6. "time"
  7. // "bytes"
  8. "encoding/json"
  9. "fmt"
  10. "log"
  11. "qfw/util"
  12. "qfw/util/mongodb"
  13. mongodbutil "qfw/util/mongodbutil"
  14. "reflect"
  15. es "gopkg.in/olivere/elastic.v1"
  16. "strconv"
  17. "strings"
  18. )
  19. //检索库服务地址
  20. var addrs []string
  21. var LocCity = map[string]string{}
  22. var SIZE = 30
  23. const (
  24. QStr = `{"query":{"bool":{"must":[$and],"must_not":[],
  25. "should":[$or],"minimum_should_match" : 1}}}`
  26. )
  27. var pool chan *es.Client
  28. var ntimeout int
  29. var syncPool sync.Pool
  30. //初始化全文检索
  31. func InitElastic(addr string) {
  32. InitElasticSize(addr, SIZE)
  33. map2 := []map[string]interface{}{}
  34. util.ReadConfig("./city.json", &map2)
  35. if len(map2) == 34 {
  36. log.Println("正确")
  37. } else {
  38. log.Println("解析出错!!")
  39. os.Exit(0)
  40. }
  41. Loop(map2, &LocCity)
  42. }
  43. //自定义HttpClient
  44. /**
  45. var httpclient = &http.Client{Transport: &http.Transport{
  46. Dial: func(netw, addr string) (net.Conn, error) {
  47. deadline := time.Now().Add(5000 * time.Millisecond)
  48. c, err := net.DialTimeout(netw, addr, 10000*time.Millisecond)
  49. if err != nil {
  50. return nil, err
  51. }
  52. tcp_conn := c.(*net.TCPConn)
  53. tcp_conn.SetKeepAlive(false)
  54. tcp_conn.SetDeadline(deadline)
  55. return tcp_conn, nil
  56. },
  57. DisableKeepAlives: true, //不保持,这样才能释放
  58. }}
  59. **/
  60. //var op = es.SetHttpClient(httpclient)
  61. func InitElasticSize(addr string, size int) {
  62. pool = make(chan *es.Client, size)
  63. for _, s := range strings.Split(addr, ",") {
  64. addrs = append(addrs, s)
  65. }
  66. for i := 0; i < size; i++ {
  67. client, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  68. pool <- client
  69. }
  70. }
  71. //关闭连接
  72. func DestoryEsConn(client *es.Client) {
  73. pool <- client
  74. }
  75. //获取连接
  76. func GetEsConn() (c *es.Client) {
  77. select {
  78. case c = <-pool:
  79. if c == nil || !c.IsRunning() {
  80. client, err := es.NewClient(es.SetURL(addrs...),
  81. es.SetMaxRetries(2), es.SetSniff(false))
  82. if err == nil && client.IsRunning() {
  83. return client
  84. }
  85. return nil
  86. }
  87. return
  88. case <-time.After(time.Second * 7):
  89. //超时
  90. ntimeout++
  91. log.Println("timeout times:", ntimeout)
  92. return nil
  93. }
  94. }
  95. //保存对象
  96. func Save(index, itype string, obj interface{}) bool {
  97. client := GetEsConn()
  98. defer DestoryEsConn(client)
  99. defer func() {
  100. if r := recover(); r != nil {
  101. log.Println("[E]", r)
  102. for skip := 1; ; skip++ {
  103. _, file, line, ok := runtime.Caller(skip)
  104. if !ok {
  105. break
  106. }
  107. go log.Printf("%v,%v\n", file, line)
  108. }
  109. }
  110. }()
  111. _, err := client.Index().Index(index).Type(itype).BodyJson(util.ObjToMap(obj)).Do()
  112. if err != nil {
  113. log.Println("保存到ES出错", err.Error(), obj)
  114. return false
  115. } else {
  116. return true
  117. }
  118. }
  119. //通用查询
  120. //{"query": {"bool":{"must":[{"query_string":{"default_field":"name","query":"*"}}]}}}
  121. //{"query":{"bool":{"must":{"match":{"content":{"query":"fulltextsearch","operator":"and"}}},"should":[{"match":{"content":{"query":"Elasticsearch","boost":3}}},{"match":{"content":{"query":"Lucene","boost":2}}}]}}}
  122. //prefix
  123. //{"query":{"match":{"title":{"query":"brownfox","operator":"and"}}}} //默认为or
  124. //{"query":{"multi_match":{"query":"PolandStreetW1V","type":"most_fields","fields":["*_street","city^2","country","postcode"]}}}
  125. //{"query":{"wildcard":{"postcode":"W?F*HW"}}}
  126. //{"query":{"regexp":{"postcode":"W[0-9].+"}}}
  127. //{"query":{"filtered":{"filter":{"range":{"price":{"gte":10000}}}}},"aggs":{"single_avg_price":{"avg":{"field":"price"}}}}
  128. //{"query":{"match":{"make":"ford"}},"aggs":{"colors":{"terms":{"field":"color"}}}}//查fork有几种颜色
  129. //过滤器不会计算相关度的得分,所以它们在计算上更快一些
  130. //{"query":{"filtered":{"query":{"match_all":{}},"filter":{"range":{"balance":{"gte":20000,"lte":30000}}}}}}
  131. //{"query":{"match_all":{}},"from":10,"size":10,"_source":["account_number","balance"],"sort":{"balance":{"order":"desc"}}}
  132. //{"query":{"match_phrase":{"address":"milllane"}}}和match不同会去匹配整个短语,相当于must[]
  133. func Get(index, itype, query string) *[]map[string]interface{} {
  134. //log.Println("query -- ", query)
  135. client := GetEsConn()
  136. defer DestoryEsConn(client)
  137. var res []map[string]interface{}
  138. if client != nil {
  139. defer func() {
  140. if r := recover(); r != nil {
  141. log.Println("[E]", r)
  142. for skip := 1; ; skip++ {
  143. _, file, line, ok := runtime.Caller(skip)
  144. if !ok {
  145. break
  146. }
  147. go log.Printf("%v,%v\n", file, line)
  148. }
  149. }
  150. }()
  151. searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
  152. if err != nil {
  153. log.Println("从ES查询出错", err.Error())
  154. return nil
  155. }
  156. if searchResult.Hits != nil {
  157. resNum := len(searchResult.Hits.Hits)
  158. if resNum < 5000 {
  159. res = make([]map[string]interface{}, resNum)
  160. for i, hit := range searchResult.Hits.Hits {
  161. //d := json.NewDecoder(bytes.NewBuffer(*hit.Source))
  162. //d.UseNumber()
  163. //d.Decode(&res[i])
  164. parseErr := json.Unmarshal(*hit.Source, &res[i])
  165. if parseErr == nil && hit.Highlight != nil && res[i] != nil {
  166. res[i]["highlight"] = map[string][]string(hit.Highlight)
  167. }
  168. }
  169. } else {
  170. log.Println("查询结果太多,查询到:", resNum, "条")
  171. }
  172. }
  173. }
  174. return &res
  175. }
  176. func GetNoLimit(index, itype, query string) *[]map[string]interface{} {
  177. //log.Println("query -- ", query)
  178. client := GetEsConn()
  179. defer DestoryEsConn(client)
  180. var res []map[string]interface{}
  181. if client != nil {
  182. defer func() {
  183. if r := recover(); r != nil {
  184. log.Println("[E]", r)
  185. for skip := 1; ; skip++ {
  186. _, file, line, ok := runtime.Caller(skip)
  187. if !ok {
  188. break
  189. }
  190. go log.Printf("%v,%v\n", file, line)
  191. }
  192. }
  193. }()
  194. searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
  195. if err != nil {
  196. log.Println("从ES查询出错", err.Error())
  197. return nil
  198. }
  199. if searchResult.Hits != nil {
  200. resNum := len(searchResult.Hits.Hits)
  201. res = make([]map[string]interface{}, resNum)
  202. for i, hit := range searchResult.Hits.Hits {
  203. json.Unmarshal(*hit.Source, &res[i])
  204. }
  205. }
  206. }
  207. return &res
  208. }
  209. //分页查询
  210. //{"name":"张三","$and":[{"age":{"$gt":10}},{"age":{"$lte":20}}]}
  211. //fields直接是 `"_id","title"`
  212. func GetPage(index, itype, query, order, field string, start, limit int) *[]map[string]interface{} {
  213. return Get(index, itype, MakeQuery(query, order, field, start, limit))
  214. }
  215. var SR = strings.Replace
  216. func MakeQuery(query, order, fileds string, start, limit int) string {
  217. res := AnalyQuery(query, "", QStr)
  218. if len(res) > 10 {
  219. res = SR(SR(SR(SR(res, ",$and", "", -1), "$and", "", -1), ",$or", "", -1), "$or", "", -1)
  220. if len(fileds) > 0 {
  221. //"_source":["account_number","balance"]
  222. res = res[:len(res)-1] + `,"_source":[` + fileds + "]}"
  223. }
  224. //{"name":-1,"age":1}
  225. if len(order) > 0 {
  226. res = res[:len(res)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
  227. }
  228. if start > -1 {
  229. res = res[:len(res)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
  230. }
  231. return res
  232. }
  233. return ""
  234. }
  235. //{"name":"aaa"}
  236. func AnalyQuery(query interface{}, parent string, result string) string {
  237. m := make(map[string]interface{})
  238. if q1, ok := query.(string); ok {
  239. json.Unmarshal([]byte(q1), &m)
  240. } else if q2, ok2 := query.(map[string]interface{}); ok2 {
  241. m = q2
  242. }
  243. if len(parent) == 0 {
  244. for k, v := range m {
  245. if k == "$and" || k == "$or" {
  246. temps := ""
  247. if map1, ok := v.([]interface{}); ok {
  248. for i := 0; i < len(map1); i++ {
  249. temps += "," + AnalyQuery(map1[i], k, "")
  250. }
  251. }
  252. if len(temps) > 0 {
  253. temps = temps[1:]
  254. }
  255. result = SR(result, k, temps+","+k, 1)
  256. } else {
  257. switch reflect.TypeOf(v).String() {
  258. case "string":
  259. if strings.Index(k, "TERM_") == 0 {
  260. result = SR(result, "$and", `{"term":{"`+SR(k, "TERM_", "", 1)+`":"`+fmt.Sprintf("%v", v)+`"}},$and`, 1)
  261. } else {
  262. result = SR(result, "$and", `{"query_string":{"default_field":"`+k+`","query":"`+fmt.Sprintf("%v", v)+`"}},$and`, 1)
  263. }
  264. case "int", "int8", "int32", "int64", "float32", "float64":
  265. if strings.Index(k, "TERM_") == 0 {
  266. result = SR(result, "$and", `{"term":{"`+SR(k, "TERM_", "", 1)+`":`+fmt.Sprintf("%v", v)+`}},$and`, 1)
  267. } else {
  268. result = SR(result, "$and", `{"query_string":{"default_field":"`+k+`","query":`+fmt.Sprintf("%v", v)+`}},$and`, 1)
  269. }
  270. default:
  271. result = SR(result, "$and", AnalyQuery(v, k, "")+",$and", 1)
  272. }
  273. }
  274. }
  275. return result
  276. } else {
  277. for k, v := range m {
  278. if k == "$in" {
  279. s := ""
  280. if map1, ok := v.([]interface{}); ok {
  281. for i := 0; i < len(map1); i++ {
  282. s += "," + `"` + fmt.Sprintf("%v", map1[i]) + `"`
  283. }
  284. }
  285. if len(s) > 0 {
  286. s = s[1:]
  287. }
  288. return `{"terms":{"` + parent + `":[` + s + `]}}`
  289. } else if strings.Contains(k, "$lt") || strings.Contains(k, "$gt") {
  290. return `{"range":{"` + parent + `":{"` + SR(k, "$", "", 1) + `":` + fmt.Sprintf("%v", v) + `}}}`
  291. } else {
  292. switch reflect.TypeOf(v).String() {
  293. case "string":
  294. if strings.Index(k, "TERM_") == 0 {
  295. return `{"term":{"` + SR(k, "TERM_", "", 1) + `":"` + fmt.Sprintf("%v", v) + `"}}`
  296. } else {
  297. return `{"query_string":{"default_field":"` + k + `","query":"` + fmt.Sprintf("%v", v) + `"}}`
  298. }
  299. case "int", "int8", "int32", "int64", "float32", "float64":
  300. if strings.Index(k, "TERM_") == 0 {
  301. return `{"term":{"` + SR(k, "TERM_", "", 1) + `":` + fmt.Sprintf("%v", v) + `}}`
  302. } else {
  303. return `{"query_string":{"default_field":"` + k + `","query":` + fmt.Sprintf("%v", v) + `}}`
  304. }
  305. default:
  306. return AnalyQuery(v, k, result)
  307. }
  308. }
  309. }
  310. }
  311. return result
  312. }
  313. func GetByIdField(index, itype, id, fields string) *map[string]interface{} {
  314. client := GetEsConn()
  315. defer DestoryEsConn(client)
  316. if client != nil {
  317. defer func() {
  318. if r := recover(); r != nil {
  319. log.Println("[E]", r)
  320. for skip := 1; ; skip++ {
  321. _, file, line, ok := runtime.Caller(skip)
  322. if !ok {
  323. break
  324. }
  325. go log.Printf("%v,%v\n", file, line)
  326. }
  327. }
  328. }()
  329. query := `{"query":{"term":{"_id":"` + id + `"}}`
  330. if len(fields) > 0 {
  331. query = query + `,"_source":[` + fields + `]`
  332. }
  333. query = query + "}"
  334. searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
  335. if err != nil {
  336. log.Println("从ES查询出错", err.Error())
  337. return nil
  338. }
  339. var res map[string]interface{}
  340. if searchResult.Hits != nil {
  341. resNum := len(searchResult.Hits.Hits)
  342. if resNum == 1 {
  343. res = make(map[string]interface{})
  344. for _, hit := range searchResult.Hits.Hits {
  345. json.Unmarshal(*hit.Source, &res)
  346. }
  347. return &res
  348. } else {
  349. log.Println("查询结果太多,查询到:", resNum, "条")
  350. }
  351. }
  352. }
  353. return nil
  354. }
  355. //根据id来查询文档
  356. func GetById(index, itype string, ids ...string) *[]map[string]interface{} {
  357. client := GetEsConn()
  358. defer DestoryEsConn(client)
  359. var res []map[string]interface{}
  360. if client != nil {
  361. defer func() {
  362. if r := recover(); r != nil {
  363. log.Println("[E]", r)
  364. for skip := 1; ; skip++ {
  365. _, file, line, ok := runtime.Caller(skip)
  366. if !ok {
  367. break
  368. }
  369. go log.Printf("%v,%v\n", file, line)
  370. }
  371. }
  372. }()
  373. query := es.NewIdsQuery().Ids(ids...)
  374. searchResult, err := client.Search().Index(index).Type(itype).Query(&query).Do()
  375. if err != nil {
  376. log.Println("从ES查询出错", err.Error())
  377. return nil
  378. }
  379. if searchResult.Hits != nil {
  380. resNum := len(searchResult.Hits.Hits)
  381. if resNum < 5000 {
  382. res = make([]map[string]interface{}, resNum)
  383. for i, hit := range searchResult.Hits.Hits {
  384. json.Unmarshal(*hit.Source, &res[i])
  385. }
  386. } else {
  387. log.Println("查询结果太多,查询到:", resNum, "条")
  388. }
  389. }
  390. }
  391. return &res
  392. }
  393. //删除某个索引,根据查询
  394. func Del(index, itype string, query interface{}) bool {
  395. client := GetEsConn()
  396. defer DestoryEsConn(client)
  397. b := false
  398. if client != nil {
  399. defer func() {
  400. if r := recover(); r != nil {
  401. log.Println("[E]", r)
  402. for skip := 1; ; skip++ {
  403. _, file, line, ok := runtime.Caller(skip)
  404. if !ok {
  405. break
  406. }
  407. go log.Printf("%v,%v\n", file, line)
  408. }
  409. }
  410. }()
  411. var err error
  412. if qs, ok := query.(string); ok {
  413. temp := es.BoolQuery{
  414. QueryStrings: qs,
  415. }
  416. _, err = client.DeleteByQuery().Index(index).Type(itype).Query(temp).Do()
  417. } else if qi, ok2 := query.(es.Query); ok2 {
  418. _, err = client.DeleteByQuery().Index(index).Type(itype).Query(qi).Do()
  419. }
  420. if err != nil {
  421. log.Println("删除索引出错:", err.Error())
  422. } else {
  423. b = true
  424. }
  425. }
  426. return b
  427. }
  428. //根据语句更新对象
  429. func Update(index, itype, id string, updateStr string) bool {
  430. client := GetEsConn()
  431. defer DestoryEsConn(client)
  432. b := false
  433. if client != nil {
  434. defer func() {
  435. if r := recover(); r != nil {
  436. log.Println("[E]", r)
  437. for skip := 1; ; skip++ {
  438. _, file, line, ok := runtime.Caller(skip)
  439. if !ok {
  440. break
  441. }
  442. go log.Printf("%v,%v\n", file, line)
  443. }
  444. }
  445. }()
  446. var err error
  447. _, err = client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
  448. if err != nil {
  449. log.Println("更新检索出错:", err.Error())
  450. } else {
  451. b = true
  452. }
  453. }
  454. return b
  455. }
  456. func BulkUpdate(index, itype string, ids []string, updateStr string) {
  457. client := GetEsConn()
  458. defer DestoryEsConn(client)
  459. if client != nil {
  460. defer func() {
  461. if r := recover(); r != nil {
  462. log.Println("[E]", r)
  463. for skip := 1; ; skip++ {
  464. _, file, line, ok := runtime.Caller(skip)
  465. if !ok {
  466. break
  467. }
  468. go log.Printf("%v,%v\n", file, line)
  469. }
  470. }
  471. }()
  472. for _, id := range ids {
  473. _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
  474. if err != nil {
  475. log.Println("更新检索出错:", err.Error())
  476. }
  477. }
  478. }
  479. }
  480. //根据id删除索引对象
  481. func DelById(index, itype, id string) bool {
  482. client := GetEsConn()
  483. defer DestoryEsConn(client)
  484. b := false
  485. if client != nil {
  486. defer func() {
  487. if r := recover(); r != nil {
  488. log.Println("[E]", r)
  489. for skip := 1; ; skip++ {
  490. _, file, line, ok := runtime.Caller(skip)
  491. if !ok {
  492. break
  493. }
  494. go log.Printf("%v,%v\n", file, line)
  495. }
  496. }
  497. }()
  498. var err error
  499. _, err = client.Delete().Index(index).Type(itype).Id(id).Do()
  500. if err != nil {
  501. log.Println("更新检索出错:", err.Error())
  502. } else {
  503. b = true
  504. }
  505. }
  506. return b
  507. }
  508. //先删除后增
  509. func UpdateNewDoc(index, itype string, obj ...interface{}) bool {
  510. client := GetEsConn()
  511. defer DestoryEsConn(client)
  512. b := false
  513. if client != nil {
  514. defer func() {
  515. if r := recover(); r != nil {
  516. log.Println("[E]", r)
  517. for skip := 1; ; skip++ {
  518. _, file, line, ok := runtime.Caller(skip)
  519. if !ok {
  520. break
  521. }
  522. go log.Printf("%v,%v\n", file, line)
  523. }
  524. }
  525. }()
  526. var err error
  527. for _, v := range obj {
  528. tempObj := util.ObjToMap(v)
  529. id := fmt.Sprintf("%v", (*tempObj)["_id"])
  530. client.Delete().Index(index).Type(itype).Id(id).Do()
  531. _, err = client.Index().Index(index).Type(itype).BodyJson(tempObj).Do()
  532. if err != nil {
  533. log.Println("保存到ES出错", err.Error())
  534. } else {
  535. b = true
  536. }
  537. }
  538. }
  539. return b
  540. }
  541. func UpdateEntDoc(id string) bool {
  542. b := false
  543. map2 := map[string]interface{}{}
  544. util.ReadConfig(&map2)
  545. ent := mongodbutil.FindById("enterprise", map2["entMongodbAlias"].(string), map2["entMongodbName"].(string), id, "")
  546. _ent := mongodb.FindById("enterprise", id, "")
  547. if _ent != nil && len(*_ent) > 0 {
  548. for k, v := range *_ent {
  549. (*ent)[k] = v
  550. }
  551. }
  552. if ent != nil {
  553. b = UpdateNewDoc("enterprise", "enterprise", ConverData(ent))
  554. }
  555. return b
  556. }
  557. //把地市代码转为地市
  558. func getLoc(code string, res *map[string]string) (loc string) {
  559. switch len(code) {
  560. case 6:
  561. loc = (*res)[code[:2]] + " " + (*res)[code[:4]] + " " + (*res)[code]
  562. break
  563. case 4:
  564. loc = (*res)[code[:2]] + " " + (*res)[code]
  565. break
  566. case 2:
  567. loc = (*res)[code]
  568. break
  569. }
  570. return
  571. }
  572. //把地市代码转为地市
  573. func Loop(m interface{}, res *map[string]string) {
  574. m1, ok := m.([]interface{})
  575. if !ok {
  576. m2, _ := m.([]map[string]interface{})
  577. for i := 0; i < len(m2); i++ {
  578. ms := m2[i]
  579. (*res)[fmt.Sprintf("%1.0f", ms["k"])] = fmt.Sprintf("%s", ms["n"])
  580. s := ms["s"]
  581. if nil != s {
  582. mss, _ := s.([]interface{})
  583. if nil != mss {
  584. Loop(mss, res)
  585. }
  586. }
  587. }
  588. } else {
  589. for i := 0; i < len(m1); i++ {
  590. ms, _ := m1[i].(map[string]interface{})
  591. (*res)[fmt.Sprintf("%1.0f", ms["k"])] = fmt.Sprintf("%s", ms["n"])
  592. s := ms["s"]
  593. if nil != s {
  594. mss, _ := s.([]interface{})
  595. if nil != mss {
  596. Loop(mss, res)
  597. }
  598. }
  599. }
  600. }
  601. }
  602. func ConverData(ent *map[string]interface{}) map[string]interface{} {
  603. tmp := *ent
  604. id64, _ := tmp["ID"].(int64)
  605. ids := fmt.Sprintf("%d", id64)
  606. tmp2 := make(map[string]interface{})
  607. tmp2["ID"] = ids
  608. tmp2["_id"] = tmp["_id"]
  609. tmp2["Area"] = tmp["Area"]
  610. tmp2["LeRep"] = tmp["LeRep"]
  611. tmp2["RegNo"] = tmp["RegNo"]
  612. tmp2["EntType"] = tmp["EntType"]
  613. tmp2["EntName"] = tmp["EntName"]
  614. tmp2["EntTypeName"] = tmp["EntTypeName"]
  615. tmp2["Dom"] = tmp["Dom"]
  616. tmp2["EstDate"] = tmp["EstDate"]
  617. tmp2["OpStateName"] = tmp["OpStateName"]
  618. tmp2["OpScope"] = tmp["OpScope"]
  619. tmp2["OpState"] = tmp["OpState"]
  620. tmp2["s_submitid"] = tmp["s_submitid"]
  621. tmp2["l_submittime"] = tmp["l_submittime"]
  622. tmp2["s_submitname"] = tmp["s_submitname"]
  623. tmp2["RegCapCurName"] = tmp["RegCapCurName"]
  624. //增加营业状态排序
  625. if tmp2["OpState"] == "06" {
  626. tmp2["OpSint"] = true
  627. } else {
  628. tmp2["OpSint"] = false
  629. }
  630. tmp2["OpLocDistrict"] = tmp["OpLocDistrict"]
  631. //增加代码转名称
  632. tmpLoc, _ := tmp["OpLocDistrict"].(string)
  633. tmp2["OpLocDistrictName"] = getLoc(tmpLoc, &LocCity)
  634. tmp2["RecCap"] = tmp["RecCap"]
  635. tmp2["RegCap"] = tmp["RegCap"]
  636. tmp2["IndustryPhy"] = tmp["IndustryPhy"]
  637. tmp2["IndustryPhyName"] = tmp["IndustryPhyName"]
  638. tmp2["RegOrg"] = tmp["RegOrg"]
  639. tmp2["RegOrgName"] = tmp["RegOrgName"]
  640. tmp2["Tel"] = tmp["Tel"]
  641. tmp2["CompForm"] = tmp["CompForm"]
  642. tmp2["CompFormName"] = tmp["CompFormName"]
  643. //增加异常名录标记 Ycml可能是bool也可能是string
  644. Ycmlb, _ := tmp["Ycml"].(bool)
  645. Ycmls, _ := tmp["Ycml"].(string)
  646. if Ycmlb || Ycmls == "1" {
  647. tmp2["Ycml"] = true
  648. } else {
  649. tmp2["Ycml"] = false
  650. }
  651. //增加年报联系信息
  652. if tmp["Nb_email"] != nil {
  653. tmp2["Nb_email"] = tmp["Nb_email"]
  654. }
  655. if tmp["Nb_tel"] != nil {
  656. tmp2["Nb_tel"] = tmp["Nb_tel"]
  657. }
  658. if tmp["Nb_addr"] != nil {
  659. tmp2["Nb_addr"] = tmp["Nb_addr"]
  660. }
  661. s_synopsis := tmp["s_synopsis"]
  662. if s_synopsis == nil {
  663. s_synopsis = ""
  664. }
  665. tmp2["s_synopsis"] = s_synopsis //企业简介
  666. //股东
  667. stock := getStock(tmp["investor"])
  668. tmp2["stock"] = stock
  669. tmp2["LegCerNO"] = tmp["LegCerNO"]
  670. if tmp["s_microwebsite"] != nil {
  671. tmp2["s_microwebsite"] = tmp["s_microwebsite"]
  672. }
  673. tmp2["SourceType"] = tmp["SourceType"] //数据来源
  674. s_servicenames := tmp["s_servicenames"]
  675. if s_servicenames == nil {
  676. s_servicenames = ""
  677. }
  678. tmp2["s_servicenames"] = s_servicenames //服务名称
  679. s_action := tmp["s_action"]
  680. if s_action == nil {
  681. s_action = "N"
  682. }
  683. tmp2["s_action"] = s_action
  684. tmp2["s_persion"] = tmp["s_persion"]
  685. tmp2["s_mobile"] = tmp["s_mobile"]
  686. tmp2["s_enturl"] = tmp["s_enturl"]
  687. tmp2["s_weixin"] = tmp["s_weixin"]
  688. tmp2["s_avatar"] = tmp["s_avatar"]
  689. return tmp2
  690. }
  691. func getStock(obj interface{}) string {
  692. stock := ""
  693. if ns, ok := obj.([]interface{}); ok {
  694. stock = " "
  695. for _, ns1 := range ns {
  696. if nn, ok1 := ns1.(map[string]interface{}); ok1 {
  697. tmp := fmt.Sprintf("%s", nn["Inv"])
  698. if strings.Index(stock, tmp) < 0 {
  699. stock += tmp + " "
  700. }
  701. }
  702. }
  703. }
  704. return stock
  705. }
  706. func BulkSave(index, itype string, obj *[]map[string]interface{}, isDelBefore bool) {
  707. client := GetEsConn()
  708. defer DestoryEsConn(client)
  709. if client != nil {
  710. defer func() {
  711. if r := recover(); r != nil {
  712. log.Println("[E]", r)
  713. for skip := 1; ; skip++ {
  714. _, file, line, ok := runtime.Caller(skip)
  715. if !ok {
  716. break
  717. }
  718. go log.Printf("%v,%v\n", file, line)
  719. }
  720. }
  721. }()
  722. req := client.Bulk()
  723. for _, v := range *obj {
  724. if isDelBefore {
  725. req = req.Add(es.NewBulkDeleteRequest().Index(index).Type(itype).Id(fmt.Sprintf("%v", v["_id"])))
  726. }
  727. req = req.Add(es.NewBulkIndexRequest().Index(index).Type(itype).Doc(v))
  728. }
  729. _, err := req.Do()
  730. if err != nil {
  731. log.Println("批量保存到ES出错", err.Error())
  732. }
  733. }
  734. }
  735. func Count(index, itype string, query interface{}) int64 {
  736. client := GetEsConn()
  737. defer DestoryEsConn(client)
  738. if client != nil {
  739. defer func() {
  740. if r := recover(); r != nil {
  741. log.Println("[E]", r)
  742. for skip := 1; ; skip++ {
  743. _, file, line, ok := runtime.Caller(skip)
  744. if !ok {
  745. break
  746. }
  747. go log.Printf("%v,%v\n", file, line)
  748. }
  749. }
  750. }()
  751. var qq es.Query
  752. if qs, ok := query.(string); ok {
  753. temp := es.BoolQuery{
  754. QueryStrings: qs,
  755. }
  756. qq = temp
  757. } else if qi, ok2 := query.(es.Query); ok2 {
  758. qq = qi
  759. }
  760. n, err := client.Count(index).Type(itype).Query(qq).Do()
  761. if err != nil {
  762. log.Println("统计出错", err.Error())
  763. }
  764. return n
  765. }
  766. return 0
  767. }
  768. //ngram精确查询
  769. /*
  770. {
  771. "query": {
  772. "bool": {
  773. "should": [
  774. {
  775. "bool":{
  776. "must":[
  777. { "multi_match": {
  778. "query": "智能",
  779. "type": "phrase",
  780. "fields": [
  781. "title"
  782. ],
  783. "analyzer": "my_ngram"
  784. }
  785. },{
  786. "multi_match": {
  787. "query": "机器",
  788. "type": "phrase",
  789. "fields": [
  790. "title"
  791. ],
  792. "analyzer": "my_ngram"
  793. }
  794. },{
  795. "multi_match": {
  796. "query": "2016",
  797. "type": "phrase",
  798. "fields": [
  799. "title"
  800. ],
  801. "analyzer": "my_ngram"
  802. }
  803. }
  804. ]
  805. }
  806. },
  807. {
  808. "bool":{
  809. "must":[
  810. { "multi_match": {
  811. "query": "河南",
  812. "type": "phrase",
  813. "fields": [
  814. "title"
  815. ],
  816. "analyzer": "my_ngram"
  817. }
  818. },{
  819. "multi_match": {
  820. "query": "工商",
  821. "type": "phrase",
  822. "fields": [
  823. "title"
  824. ],
  825. "analyzer": "my_ngram"
  826. }
  827. },{
  828. "multi_match": {
  829. "query": "2016",
  830. "type": "phrase",
  831. "fields": [
  832. "title"
  833. ],
  834. "analyzer": "my_ngram"
  835. }
  836. }
  837. ]
  838. }
  839. }
  840. ],"minimum_should_match": 1
  841. }
  842. },
  843. "_source": [
  844. "_id",
  845. "title"
  846. ],
  847. "from": 0,
  848. "size": 10,
  849. "sort": [{
  850. "publishtime": "desc"
  851. }]
  852. }
  853. */
  854. //"2016+智能+办公,"河南+工商"
  855. //["2016+智能+办公","河南+工商"]
  856. //QStr = `{"query":{"bool":{should":[$or],"minimum_should_match" : 1}}}`
  857. //{"bool":{"must":[]}}
  858. //{"multi_match": {"query": "$word","type": "phrase", "fields": [$field],"analyzer": "my_ngram"}}
  859. //"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {"detail": {"fragment_size": 1,"number_of_fragments": 1},"title": {"fragment_size": 1,"number_of_fragments": 1}}}
  860. const (
  861. FilterQuery = `{"query": {"filtered": {"filter": {"bool": {"must": [%s]}},%s}}`
  862. NgramStr = `{"query":{"bool":{"must":[%s],"should":[%s],"minimum_should_match" : 1}}}`
  863. NgramMust = `{"bool":{"must":[%s]}}`
  864. minq = `{"multi_match": {"query": "%s","type": "phrase", "fields": [%s],"analyzer": "my_ngram"}}`
  865. HL = `"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {%s}}`
  866. highlightStr = `%s: {"fragment_size": %d,"number_of_fragments": 1}`
  867. FilterQuery_New = `{"query":{"bool":{"must": [%s],"should":[%s%s]}}}`
  868. MatchQueryString = `{"match": {%s: { "query":"%s", "operator": "and"}}}`
  869. HL_New = `"highlight": {"pre_tags": ["<HL>"],"post_tags": ["<HL>"],"fields": {%s}}`
  870. )
  871. func GetNgramQuery(query interface{}, mustquery, findfields string) (qstr string) {
  872. var words []string
  873. if q, ok := query.(string); ok {
  874. if q != "" {
  875. words = strings.Split(q, ",")
  876. }
  877. } else if q, ok := query.([]string); ok {
  878. words = q
  879. } else if q, ok := query.([]interface{}); ok {
  880. words = util.ObjArrToStringArr(q)
  881. }
  882. if words != nil {
  883. new_minq := fmt.Sprintf(minq, "%s", findfields)
  884. musts := []string{}
  885. for _, qs_words := range words {
  886. qws := strings.Split(qs_words, "+")
  887. mq := []string{}
  888. for _, qs_word := range qws {
  889. mq = append(mq, fmt.Sprintf(new_minq, qs_word))
  890. }
  891. musts = append(musts, fmt.Sprintf(NgramMust, strings.Join(mq, ",")))
  892. }
  893. qstr = fmt.Sprintf(NgramStr, mustquery, strings.Join(musts, ","))
  894. //log.Println("ngram-query", qstr)
  895. } else {
  896. qstr = fmt.Sprintf(NgramStr, mustquery, "")
  897. }
  898. return
  899. }
  900. func GetNgramQuery_New(querystring, querymust interface{}, must, findfields string) (qstring string) {
  901. querymust_string := ""
  902. var wordsMust []string
  903. if q, ok := querymust.(string); ok {
  904. if q != "" {
  905. wordsMust = strings.Split(q, ",")
  906. }
  907. } else if q, ok := querymust.([]string); ok {
  908. wordsMust = q
  909. } else if q, ok := querymust.([]interface{}); ok {
  910. wordsMust = util.ObjArrToStringArr(q)
  911. }
  912. if wordsMust != nil {
  913. new_minq := fmt.Sprintf(minq, "%s", findfields)
  914. musts := []string{}
  915. for _, qs_wordsMust := range wordsMust {
  916. qws := strings.Split(qs_wordsMust, "+")
  917. mq := []string{}
  918. for _, qs_word := range qws {
  919. mq = append(mq, fmt.Sprintf(new_minq, qs_word))
  920. }
  921. musts = append(musts, fmt.Sprintf(NgramMust, strings.Join(mq, ",")))
  922. }
  923. querymust_string = strings.Join(musts, ",")
  924. }
  925. //log.Println("must", must, querymust_string)
  926. //querystring---------------------------------------------
  927. query_string := ""
  928. var querysShold []string
  929. if q, ok := querystring.(string); ok {
  930. if q != "" {
  931. querysShold = strings.Split(q, ",")
  932. }
  933. } else if q, ok := querystring.([]string); ok {
  934. querysShold = q
  935. } else if q, ok := querystring.([]interface{}); ok {
  936. querysShold = util.ObjArrToStringArr(q)
  937. }
  938. if querysShold != nil {
  939. for k, name := range strings.Split(findfields, ",") {
  940. for _, qs_querysShold := range querysShold {
  941. if k > 0 {
  942. query_string = query_string + "," + fmt.Sprintf(MatchQueryString, fmt.Sprint(name), qs_querysShold)
  943. } else {
  944. query_string = query_string + fmt.Sprintf(MatchQueryString, fmt.Sprint(name), qs_querysShold)
  945. }
  946. }
  947. }
  948. }
  949. //log.Println("querystring", query_string)
  950. if querymust_string == "" {
  951. qstring = fmt.Sprintf(FilterQuery_New, must, query_string, querymust_string)
  952. } else {
  953. qstring = fmt.Sprintf(FilterQuery_New, must, query_string, ","+querymust_string)
  954. }
  955. return
  956. }
  957. func GetByNgram(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int) *[]map[string]interface{} {
  958. return GetByNgramAll(index, itype, query, mustquery, findfields, order, fields, start, limit, false, false)
  959. }
  960. //增加高亮、过滤查询、高亮截取字数
  961. func GetByNgramOther(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool, count int) *[]map[string]interface{} {
  962. defer util.Catch()
  963. qstr := ""
  964. if mustquery != "" && filtermode {
  965. qstr = GetNgramQuery(query, "", findfields)
  966. qstr = fmt.Sprintf(FilterQuery, mustquery, qstr[1:])
  967. } else {
  968. qstr = GetNgramQuery(query, mustquery, findfields)
  969. }
  970. if qstr != "" {
  971. if highlight {
  972. ws := []string{}
  973. for _, w := range strings.Split(findfields, ",") {
  974. ws = append(ws, fmt.Sprintf(highlightStr, w, count))
  975. }
  976. qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
  977. }
  978. if len(fields) > 0 {
  979. qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
  980. }
  981. if len(order) > 0 {
  982. qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
  983. }
  984. if start > -1 {
  985. qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
  986. }
  987. //log.Println("ngram-find", qstr)
  988. return Get(index, itype, qstr)
  989. } else {
  990. return nil
  991. }
  992. }
  993. //增加高亮、过滤查询
  994. func GetByNgramAll(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool) *[]map[string]interface{} {
  995. defer util.Catch()
  996. qstr := ""
  997. if mustquery != "" && filtermode {
  998. qstr = GetNgramQuery(query, "", findfields)
  999. qstr = fmt.Sprintf(FilterQuery, mustquery, qstr[1:])
  1000. } else {
  1001. qstr = GetNgramQuery(query, mustquery, findfields)
  1002. }
  1003. if qstr != "" {
  1004. if highlight {
  1005. ws := []string{}
  1006. for _, w := range strings.Split(findfields, ",") {
  1007. ws = append(ws, fmt.Sprintf(highlightStr, w, 1))
  1008. }
  1009. qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
  1010. }
  1011. if len(fields) > 0 {
  1012. qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
  1013. }
  1014. if len(order) > 0 {
  1015. qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
  1016. }
  1017. if start > -1 {
  1018. qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
  1019. }
  1020. //log.Println("ngram-find", qstr)
  1021. return Get(index, itype, qstr)
  1022. } else {
  1023. return nil
  1024. }
  1025. }
  1026. //增加高亮、过滤查询
  1027. func GetByNgramAll_New(index, itype string, querystring, querymust interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool) *[]map[string]interface{} {
  1028. defer util.Catch()
  1029. qstr := ""
  1030. if filtermode {
  1031. qstr = GetNgramQuery_New(querystring, querymust, mustquery, findfields)
  1032. } else {
  1033. qstr = GetNgramQuery_New(querystring, "", mustquery, findfields)
  1034. }
  1035. if qstr != "" {
  1036. if highlight {
  1037. ws := []string{}
  1038. for _, w := range strings.Split(findfields, ",") {
  1039. ws = append(ws, w+`:{"force_source": true}`)
  1040. }
  1041. qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL_New, strings.Join(ws, ",")) + `}`
  1042. }
  1043. if len(fields) > 0 {
  1044. qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
  1045. }
  1046. if len(order) > 0 {
  1047. qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", ",", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
  1048. }
  1049. if start > -1 {
  1050. qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
  1051. }
  1052. //log.Println("ngram-find", order, qstr)
  1053. return Get(index, itype, qstr)
  1054. } else {
  1055. return nil
  1056. }
  1057. }