elasticutil.go 34 KB


  1. package main
  2. import (
  3. "context"
  4. "runtime"
  5. "sync"
  6. "time"
  7. // "bytes"
  8. "encoding/json"
  9. "fmt"
  10. "log"
  11. "qfw/util"
  12. "qfw/util/mongodb"
  13. mongodbutil "qfw/util/mongodbutil"
  14. "reflect"
  15. es "gopkg.in/olivere/elastic.v5"
  16. "strconv"
  17. "strings"
  18. )
  19. //检索库服务地址
  20. var (
  21. addrs []string
  22. LocCity = map[string]string{}
  23. SIZE = 15
  24. pool chan *es.Client
  25. ntimeout int
  26. poolsize = int32(20)
  27. newesLock = &sync.Mutex{}
  28. SR = strings.Replace
  29. )
  30. const (
  31. QStr = `{"query":{"bool":{"must":[$and],"must_not":[],
  32. "should":[$or],"minimum_should_match" : 0}}}`
  33. )
  34. func InitElastic(addr string) {
  35. InitElasticSize(addr, 5)
  36. }
  37. //n倍的池
  38. func InitElasticSize(addr string, size int) {
  39. poolsize = int32(3 * size)
  40. pool = make(chan *es.Client, poolsize)
  41. for _, s := range strings.Split(addr, ",") {
  42. addrs = append(addrs, s)
  43. }
  44. for i := 0; i < size; i++ {
  45. client, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  46. pool <- client
  47. }
  48. }
  49. //关闭连接
  50. func DestoryEsConn(client *es.Client) {
  51. select {
  52. case pool <- client:
  53. break
  54. case <-time.After(time.Second * 1):
  55. if client != nil {
  56. client.Stop()
  57. }
  58. client = nil
  59. }
  60. }
  61. var (
  62. lastTime = int64(0)
  63. lastTimeLock = &sync.Mutex{}
  64. )
  65. //获取连接
  66. func GetEsConn() *es.Client {
  67. select {
  68. case c := <-pool:
  69. if c == nil || !c.IsRunning() {
  70. log.Println("new esclient.", len(pool))
  71. client, err := es.NewClient(es.SetURL(addrs...),
  72. es.SetMaxRetries(2), es.SetSniff(false))
  73. if err == nil && client.IsRunning() {
  74. return client
  75. }
  76. }
  77. return c
  78. case <-time.After(time.Second * 4):
  79. //超时
  80. ntimeout++
  81. lastTimeLock.Lock()
  82. defer lastTimeLock.Unlock()
  83. //12秒后允许创建链接
  84. c := time.Now().Unix() - lastTime
  85. if c > 6 {
  86. lastTime = time.Now().Unix()
  87. log.Println("add client..", len(pool))
  88. c, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  89. go func() {
  90. for i := 0; i < 2; i++ {
  91. client, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  92. pool <- client
  93. }
  94. }()
  95. return c
  96. }
  97. return nil
  98. }
  99. }
  100. //保存对象
  101. func Save(index, itype string, obj interface{}) bool {
  102. client := GetEsConn()
  103. defer DestoryEsConn(client)
  104. defer func() {
  105. if r := recover(); r != nil {
  106. log.Println("[E]", r)
  107. for skip := 1; ; skip++ {
  108. _, file, line, ok := runtime.Caller(skip)
  109. if !ok {
  110. break
  111. }
  112. go log.Printf("%v,%v\n", file, line)
  113. }
  114. }
  115. }()
  116. data := util.ObjToMap(obj)
  117. _id := util.BsonIdToSId((*data)["_id"])
  118. (*data)["id"] = _id
  119. delete((*data), "_id")
  120. _, err := client.Index().Index(index).Type(itype).Id(_id).BodyJson(data).Do(context.TODO())
  121. if err != nil {
  122. log.Println("保存到ES出错", err.Error(), obj)
  123. return false
  124. } else {
  125. return true
  126. }
  127. }
  128. //底层查询方法
  129. func Get(index, itype, query string) *[]map[string]interface{} {
  130. //log.Println("query -- ", query)
  131. client := GetEsConn()
  132. defer DestoryEsConn(client)
  133. var res []map[string]interface{}
  134. if client != nil {
  135. defer func() {
  136. if r := recover(); r != nil {
  137. log.Println("[E]", r)
  138. for skip := 1; ; skip++ {
  139. _, file, line, ok := runtime.Caller(skip)
  140. if !ok {
  141. break
  142. }
  143. go log.Printf("%v,%v\n", file, line)
  144. }
  145. }
  146. }()
  147. searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do(context.TODO())
  148. if err != nil {
  149. log.Println("从ES查询出错", err.Error())
  150. return nil
  151. }
  152. if searchResult.Hits != nil {
  153. resNum := len(searchResult.Hits.Hits)
  154. if resNum < 3000 {
  155. res = make([]map[string]interface{}, resNum)
  156. for i, hit := range searchResult.Hits.Hits {
  157. parseErr := json.Unmarshal(*hit.Source, &res[i])
  158. if res[i] != nil {
  159. res[i]["_id"] = hit.Id
  160. }
  161. if parseErr == nil && hit.Highlight != nil && res[i] != nil {
  162. res[i]["highlight"] = map[string][]string(hit.Highlight)
  163. }
  164. }
  165. } else {
  166. log.Println("查询结果太多,查询到:", resNum, "条")
  167. }
  168. }
  169. }
  170. return &res
  171. }
  172. func GetNoLimit(index, itype, query string) *[]map[string]interface{} {
  173. client := GetEsConn()
  174. defer DestoryEsConn(client)
  175. var res []map[string]interface{}
  176. if client != nil {
  177. defer func() {
  178. if r := recover(); r != nil {
  179. log.Println("[E]", r)
  180. for skip := 1; ; skip++ {
  181. _, file, line, ok := runtime.Caller(skip)
  182. if !ok {
  183. break
  184. }
  185. go log.Printf("%v,%v\n", file, line)
  186. }
  187. }
  188. }()
  189. searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do(context.TODO())
  190. if err != nil {
  191. log.Println("从ES查询出错", err.Error())
  192. return nil
  193. }
  194. if searchResult.Hits != nil {
  195. resNum := len(searchResult.Hits.Hits)
  196. res = make([]map[string]interface{}, resNum)
  197. for i, hit := range searchResult.Hits.Hits {
  198. json.Unmarshal(*hit.Source, &res[i])
  199. if res[i] != nil {
  200. res[i]["_id"] = hit.Id
  201. }
  202. }
  203. }
  204. }
  205. return &res
  206. }
  207. //分页查询
  208. //{"name":"张三","$and":[{"age":{"$gt":10}},{"age":{"$lte":20}}]}
  209. //fields直接是 `"_id","title"`
  210. func GetPage(index, itype, query, order, field string, start, limit int) *[]map[string]interface{} {
  211. return Get(index, itype, MakeQuery(query, order, field, start, limit))
  212. }
  213. func MakeQuery(query, order, fileds string, start, limit int) string {
  214. res := AnalyQuery(query, "", QStr)
  215. if len(res) > 10 {
  216. res = SR(SR(SR(SR(res, ",$and", "", -1), "$and", "", -1), ",$or", "", -1), "$or", "", -1)
  217. if len(fileds) > 0 {
  218. //"_source":["account_number","balance"]
  219. res = res[:len(res)-1] + `,"_source":[` + fileds + "]}"
  220. }
  221. //{"name":-1,"age":1}
  222. if len(order) > 0 {
  223. res = res[:len(res)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
  224. }
  225. if start > -1 {
  226. res = res[:len(res)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
  227. }
  228. return res
  229. }
  230. return ""
  231. }
  232. //{"name":"aaa"}
  233. func AnalyQuery(query interface{}, parent string, result string) string {
  234. m := make(map[string]interface{})
  235. if q1, ok := query.(string); ok {
  236. json.Unmarshal([]byte(q1), &m)
  237. } else if q2, ok2 := query.(map[string]interface{}); ok2 {
  238. m = q2
  239. }
  240. if len(parent) == 0 {
  241. for k, v := range m {
  242. if k == "$and" || k == "$or" {
  243. temps := ""
  244. if map1, ok := v.([]interface{}); ok {
  245. for i := 0; i < len(map1); i++ {
  246. temps += "," + AnalyQuery(map1[i], k, "")
  247. }
  248. }
  249. if len(temps) > 0 {
  250. temps = temps[1:]
  251. }
  252. result = SR(result, k, temps+","+k, 1)
  253. } else {
  254. switch reflect.TypeOf(v).String() {
  255. case "string":
  256. if strings.Index(k, "TERM_") == 0 {
  257. result = SR(result, "$and", `{"term":{"`+SR(k, "TERM_", "", 1)+`":"`+fmt.Sprintf("%v", v)+`"}},$and`, 1)
  258. } else {
  259. result = SR(result, "$and", `{"query_string":{"default_field":"`+k+`","query":"`+fmt.Sprintf("%v", v)+`"}},$and`, 1)
  260. }
  261. case "int", "int8", "int32", "int64", "float32", "float64":
  262. if strings.Index(k, "TERM_") == 0 {
  263. result = SR(result, "$and", `{"term":{"`+SR(k, "TERM_", "", 1)+`":`+fmt.Sprintf("%v", v)+`}},$and`, 1)
  264. } else {
  265. result = SR(result, "$and", `{"query_string":{"default_field":"`+k+`","query":`+fmt.Sprintf("%v", v)+`}},$and`, 1)
  266. }
  267. default:
  268. result = SR(result, "$and", AnalyQuery(v, k, "")+",$and", 1)
  269. }
  270. }
  271. }
  272. return result
  273. } else {
  274. for k, v := range m {
  275. if k == "$in" {
  276. s := ""
  277. if map1, ok := v.([]interface{}); ok {
  278. for i := 0; i < len(map1); i++ {
  279. s += "," + `"` + fmt.Sprintf("%v", map1[i]) + `"`
  280. }
  281. }
  282. if len(s) > 0 {
  283. s = s[1:]
  284. }
  285. return `{"terms":{"` + parent + `":[` + s + `]}}`
  286. } else if strings.Contains(k, "$lt") || strings.Contains(k, "$gt") {
  287. return `{"range":{"` + parent + `":{"` + SR(k, "$", "", 1) + `":` + fmt.Sprintf("%v", v) + `}}}`
  288. } else {
  289. switch reflect.TypeOf(v).String() {
  290. case "string":
  291. if strings.Index(k, "TERM_") == 0 {
  292. return `{"term":{"` + SR(k, "TERM_", "", 1) + `":"` + fmt.Sprintf("%v", v) + `"}}`
  293. } else {
  294. return `{"query_string":{"default_field":"` + k + `","query":"` + fmt.Sprintf("%v", v) + `"}}`
  295. }
  296. case "int", "int8", "int32", "int64", "float32", "float64":
  297. if strings.Index(k, "TERM_") == 0 {
  298. return `{"term":{"` + SR(k, "TERM_", "", 1) + `":` + fmt.Sprintf("%v", v) + `}}`
  299. } else {
  300. return `{"query_string":{"default_field":"` + k + `","query":` + fmt.Sprintf("%v", v) + `}}`
  301. }
  302. default:
  303. return AnalyQuery(v, k, result)
  304. }
  305. }
  306. }
  307. }
  308. return result
  309. }
  310. func GetByIdField(index, itype, id, fields string) *map[string]interface{} {
  311. client := GetEsConn()
  312. defer DestoryEsConn(client)
  313. if client != nil {
  314. defer func() {
  315. if r := recover(); r != nil {
  316. log.Println("[E]", r)
  317. for skip := 1; ; skip++ {
  318. _, file, line, ok := runtime.Caller(skip)
  319. if !ok {
  320. break
  321. }
  322. go log.Printf("%v,%v\n", file, line)
  323. }
  324. }
  325. }()
  326. query := `{"query":{"term":{"_id":"` + id + `"}}`
  327. if len(fields) > 0 {
  328. query = query + `,"_source":[` + fields + `]`
  329. }
  330. query = query + "}"
  331. searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do(context.TODO())
  332. if err != nil {
  333. log.Println("从ES查询出错", err.Error())
  334. return nil
  335. }
  336. var res map[string]interface{}
  337. if searchResult.Hits != nil {
  338. resNum := len(searchResult.Hits.Hits)
  339. if resNum == 1 {
  340. res = make(map[string]interface{})
  341. for _, hit := range searchResult.Hits.Hits {
  342. json.Unmarshal(*hit.Source, &res)
  343. if res != nil {
  344. res["_id"] = hit.Id
  345. }
  346. }
  347. return &res
  348. } else {
  349. log.Println("查询结果太多,查询到:", resNum, "条")
  350. }
  351. }
  352. }
  353. return nil
  354. }
  355. //根据id来查询文档
  356. func GetById(index, itype string, ids ...string) *[]map[string]interface{} {
  357. client := GetEsConn()
  358. defer DestoryEsConn(client)
  359. var res []map[string]interface{}
  360. if client != nil {
  361. defer func() {
  362. if r := recover(); r != nil {
  363. log.Println("[E]", r)
  364. for skip := 1; ; skip++ {
  365. _, file, line, ok := runtime.Caller(skip)
  366. if !ok {
  367. break
  368. }
  369. go log.Printf("%v,%v\n", file, line)
  370. }
  371. }
  372. }()
  373. query := es.NewIdsQuery().Ids(ids...)
  374. searchResult, err := client.Search().Index(index).Type(itype).Query(query).Do(context.TODO())
  375. if err != nil {
  376. log.Println("从ES查询出错", err.Error())
  377. return nil
  378. }
  379. if searchResult.Hits != nil {
  380. resNum := len(searchResult.Hits.Hits)
  381. if resNum < 5000 {
  382. res = make([]map[string]interface{}, resNum)
  383. for i, hit := range searchResult.Hits.Hits {
  384. json.Unmarshal(*hit.Source, &res[i])
  385. if res[i] != nil {
  386. res[i]["_id"] = hit.Id
  387. }
  388. }
  389. } else {
  390. log.Println("查询结果太多,查询到:", resNum, "条")
  391. }
  392. }
  393. }
  394. return &res
  395. }
  396. //elastic_v5之后查询语句适配
  397. func StringToQuery(q string) es.Query {
  398. defer util.Catch()
  399. m := map[string]interface{}{}
  400. err := json.Unmarshal([]byte(q), &m)
  401. if err == nil {
  402. q, _ := m["query"].(map[string]interface{})
  403. return AQ(q)
  404. }
  405. return nil
  406. }
  407. func AQ(m map[string]interface{}) (bq es.Query) {
  408. for k, v := range m {
  409. if k == "bool" {
  410. bq1 := es.NewBoolQuery()
  411. b1, _ := v.(map[string]interface{})
  412. if b1["must"] != nil {
  413. bm, _ := b1["must"].([]interface{})
  414. for _, tm := range bm {
  415. tm1, _ := tm.(map[string]interface{})
  416. if tm1 != nil {
  417. bq1.Must(AQ(tm1))
  418. }
  419. }
  420. }
  421. if b1["must_not"] != nil {
  422. bm, _ := b1["must_not"].([]interface{})
  423. for _, tm := range bm {
  424. tm1, _ := tm.(map[string]interface{})
  425. if tm1 != nil {
  426. bq1.MustNot(AQ(tm1))
  427. }
  428. }
  429. }
  430. if b1["should"] != nil {
  431. bm, _ := b1["should"].([]interface{})
  432. for _, tm := range bm {
  433. tm1, _ := tm.(map[string]interface{})
  434. if tm1 != nil {
  435. bq1.Should(AQ(tm1))
  436. }
  437. }
  438. }
  439. if b1["minimum_should_match"] != nil {
  440. bq1.MinimumNumberShouldMatch(util.IntAll(b1["minimum_should_match"]))
  441. }
  442. bq = bq1
  443. } else if k == "term" {
  444. b1, _ := v.(map[string]interface{})
  445. for k, v := range b1 {
  446. bq = es.NewTermQuery(k, v)
  447. break
  448. }
  449. } else if k == "terms" {
  450. b1, _ := v.(map[string]interface{})
  451. for k, v := range b1 {
  452. vs, _ := v.([]interface{})
  453. bq = es.NewTermsQuery(k, vs...)
  454. break
  455. }
  456. } else if k == "range" {
  457. b1, _ := v.(map[string]interface{})
  458. for k, v := range b1 {
  459. vm := v.(map[string]interface{})
  460. bq1 := es.NewRangeQuery(k)
  461. if vm["$gt"] != nil {
  462. bq1.Gt(vm["$gt"])
  463. }
  464. if vm["$gte"] != nil {
  465. bq1.Gte(vm["$gte"])
  466. }
  467. if vm["$lt"] != nil {
  468. bq1.Lt(vm["$lt"])
  469. }
  470. if vm["$lte"] != nil {
  471. bq1.Lte(vm["$lte"])
  472. }
  473. break
  474. }
  475. }
  476. }
  477. return
  478. }
  479. //删除某个索引,根据查询
  480. func Del(index, itype string, query interface{}) bool {
  481. client := GetEsConn()
  482. defer DestoryEsConn(client)
  483. b := false
  484. if client != nil {
  485. defer func() {
  486. if r := recover(); r != nil {
  487. log.Println("[E]", r)
  488. for skip := 1; ; skip++ {
  489. _, file, line, ok := runtime.Caller(skip)
  490. if !ok {
  491. break
  492. }
  493. go log.Printf("%v,%v\n", file, line)
  494. }
  495. }
  496. }()
  497. var err error
  498. if qs, ok := query.(string); ok {
  499. //by, e := json.Marshal(qs)
  500. esq := StringToQuery(qs)
  501. if esq != nil {
  502. _, err = client.DeleteByQuery().Index(index).Type(itype).Query(esq).Do(context.TODO())
  503. }
  504. } else if qi, ok2 := query.(es.Query); ok2 {
  505. _, err = client.DeleteByQuery().Index(index).Type(itype).Query(qi).Do(context.TODO())
  506. }
  507. if err != nil {
  508. log.Println("删除索引出错:", err.Error())
  509. } else {
  510. b = true
  511. }
  512. }
  513. return b
  514. }
  515. //根据语句更新对象
  516. func Update(index, itype, id string, updateStr string) bool {
  517. client := GetEsConn()
  518. defer DestoryEsConn(client)
  519. b := false
  520. if client != nil {
  521. defer func() {
  522. if r := recover(); r != nil {
  523. log.Println("[E]", r)
  524. for skip := 1; ; skip++ {
  525. _, file, line, ok := runtime.Caller(skip)
  526. if !ok {
  527. break
  528. }
  529. go log.Printf("%v,%v\n", file, line)
  530. }
  531. }
  532. }()
  533. var err error
  534. esc := es.NewScript(updateStr)
  535. esc.Lang("groovy")
  536. _, err = client.Update().Index(index).Type(itype).Id(id).Script(esc).Do(context.TODO())
  537. if err != nil {
  538. log.Println("更新检索出错:", err.Error())
  539. } else {
  540. b = true
  541. }
  542. }
  543. return b
  544. }
  545. func BulkUpdate(index, itype string, ids []string, updateStr string) {
  546. client := GetEsConn()
  547. defer DestoryEsConn(client)
  548. if client != nil {
  549. defer func() {
  550. if r := recover(); r != nil {
  551. log.Println("[E]", r)
  552. for skip := 1; ; skip++ {
  553. _, file, line, ok := runtime.Caller(skip)
  554. if !ok {
  555. break
  556. }
  557. go log.Printf("%v,%v\n", file, line)
  558. }
  559. }
  560. }()
  561. for _, id := range ids {
  562. esc := es.NewScript(updateStr)
  563. esc.Lang("groovy")
  564. _, err := client.Update().Index(index).Type(itype).Id(id).Script(esc).Do(context.TODO())
  565. if err != nil {
  566. log.Println("更新检索出错:", err.Error())
  567. }
  568. }
  569. }
  570. }
  571. //根据id删除索引对象
  572. func DelById(index, itype, id string) bool {
  573. client := GetEsConn()
  574. defer DestoryEsConn(client)
  575. b := false
  576. if client != nil {
  577. defer func() {
  578. if r := recover(); r != nil {
  579. log.Println("[E]", r)
  580. for skip := 1; ; skip++ {
  581. _, file, line, ok := runtime.Caller(skip)
  582. if !ok {
  583. break
  584. }
  585. go log.Printf("%v,%v\n", file, line)
  586. }
  587. }
  588. }()
  589. var err error
  590. _, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.TODO())
  591. if err != nil {
  592. log.Println("更新检索出错:", err.Error())
  593. } else {
  594. b = true
  595. }
  596. }
  597. return b
  598. }
  599. //先删除后增
  600. func UpdateNewDoc(index, itype string, obj ...interface{}) bool {
  601. client := GetEsConn()
  602. defer DestoryEsConn(client)
  603. b := false
  604. if client != nil {
  605. defer func() {
  606. if r := recover(); r != nil {
  607. log.Println("[E]", r)
  608. for skip := 1; ; skip++ {
  609. _, file, line, ok := runtime.Caller(skip)
  610. if !ok {
  611. break
  612. }
  613. go log.Printf("%v,%v\n", file, line)
  614. }
  615. }
  616. }()
  617. var err error
  618. for _, v := range obj {
  619. tempObj := util.ObjToMap(v)
  620. if tempObj == nil || len(*tempObj) == 0 {
  621. continue
  622. }
  623. id := util.BsonIdToSId((*tempObj)["_id"])
  624. (*tempObj)["id"] = id
  625. delete(*tempObj, "_id")
  626. if id != "" {
  627. client.Delete().Index(index).Type(itype).Id(id).Do(context.TODO())
  628. }
  629. _, err = client.Index().Index(index).Type(itype).Id(id).BodyJson(tempObj).Do(context.TODO())
  630. if err != nil {
  631. log.Println("保存到ES出错", err.Error())
  632. } else {
  633. b = true
  634. }
  635. }
  636. }
  637. return b
  638. }
  639. func UpdateEntDoc(id string) bool {
  640. b := false
  641. map2 := map[string]interface{}{}
  642. util.ReadConfig(&map2)
  643. ent := mongodbutil.FindById("enterprise", map2["entMongodbAlias"].(string), map2["entMongodbName"].(string), id, "")
  644. _ent := mongodb.FindById("enterprise", id, "")
  645. if _ent != nil && len(*_ent) > 0 {
  646. for k, v := range *_ent {
  647. (*ent)[k] = v
  648. }
  649. }
  650. if ent != nil {
  651. b = UpdateNewDoc("enterprise", "enterprise", ConverData(ent))
  652. }
  653. return b
  654. }
  655. //把地市代码转为地市
  656. func getLoc(code string, res *map[string]string) (loc string) {
  657. switch len(code) {
  658. case 6:
  659. loc = (*res)[code[:2]] + " " + (*res)[code[:4]] + " " + (*res)[code]
  660. break
  661. case 4:
  662. loc = (*res)[code[:2]] + " " + (*res)[code]
  663. break
  664. case 2:
  665. loc = (*res)[code]
  666. break
  667. }
  668. return
  669. }
  670. //把地市代码转为地市
  671. func Loop(m interface{}, res *map[string]string) {
  672. m1, ok := m.([]interface{})
  673. if !ok {
  674. m2, _ := m.([]map[string]interface{})
  675. for i := 0; i < len(m2); i++ {
  676. ms := m2[i]
  677. (*res)[fmt.Sprintf("%1.0f", ms["k"])] = fmt.Sprintf("%s", ms["n"])
  678. s := ms["s"]
  679. if nil != s {
  680. mss, _ := s.([]interface{})
  681. if nil != mss {
  682. Loop(mss, res)
  683. }
  684. }
  685. }
  686. } else {
  687. for i := 0; i < len(m1); i++ {
  688. ms, _ := m1[i].(map[string]interface{})
  689. (*res)[fmt.Sprintf("%1.0f", ms["k"])] = fmt.Sprintf("%s", ms["n"])
  690. s := ms["s"]
  691. if nil != s {
  692. mss, _ := s.([]interface{})
  693. if nil != mss {
  694. Loop(mss, res)
  695. }
  696. }
  697. }
  698. }
  699. }
  700. func ConverData(ent *map[string]interface{}) map[string]interface{} {
  701. tmp := *ent
  702. id64, _ := tmp["ID"].(int64)
  703. ids := fmt.Sprintf("%d", id64)
  704. tmp2 := make(map[string]interface{})
  705. tmp2["ID"] = ids
  706. tmp2["_id"] = tmp["_id"]
  707. tmp2["id"] = tmp["_id"]
  708. tmp2["Area"] = tmp["Area"]
  709. tmp2["LeRep"] = tmp["LeRep"]
  710. tmp2["RegNo"] = tmp["RegNo"]
  711. tmp2["EntType"] = tmp["EntType"]
  712. tmp2["EntName"] = tmp["EntName"]
  713. tmp2["EntTypeName"] = tmp["EntTypeName"]
  714. tmp2["Dom"] = tmp["Dom"]
  715. tmp2["EstDate"] = tmp["EstDate"]
  716. tmp2["OpStateName"] = tmp["OpStateName"]
  717. tmp2["OpScope"] = tmp["OpScope"]
  718. tmp2["OpState"] = tmp["OpState"]
  719. tmp2["s_submitid"] = tmp["s_submitid"]
  720. tmp2["l_submittime"] = tmp["l_submittime"]
  721. tmp2["s_submitname"] = tmp["s_submitname"]
  722. tmp2["RegCapCurName"] = tmp["RegCapCurName"]
  723. //增加营业状态排序
  724. if tmp2["OpState"] == "06" {
  725. tmp2["OpSint"] = true
  726. } else {
  727. tmp2["OpSint"] = false
  728. }
  729. tmp2["OpLocDistrict"] = tmp["OpLocDistrict"]
  730. //增加代码转名称
  731. tmpLoc, _ := tmp["OpLocDistrict"].(string)
  732. tmp2["OpLocDistrictName"] = getLoc(tmpLoc, &LocCity)
  733. tmp2["RecCap"] = tmp["RecCap"]
  734. tmp2["RegCap"] = tmp["RegCap"]
  735. tmp2["IndustryPhy"] = tmp["IndustryPhy"]
  736. tmp2["IndustryPhyName"] = tmp["IndustryPhyName"]
  737. tmp2["RegOrg"] = tmp["RegOrg"]
  738. tmp2["RegOrgName"] = tmp["RegOrgName"]
  739. tmp2["Tel"] = tmp["Tel"]
  740. tmp2["CompForm"] = tmp["CompForm"]
  741. tmp2["CompFormName"] = tmp["CompFormName"]
  742. //增加异常名录标记 Ycml可能是bool也可能是string
  743. Ycmlb, _ := tmp["Ycml"].(bool)
  744. Ycmls, _ := tmp["Ycml"].(string)
  745. if Ycmlb || Ycmls == "1" {
  746. tmp2["Ycml"] = true
  747. } else {
  748. tmp2["Ycml"] = false
  749. }
  750. //增加年报联系信息
  751. if tmp["Nb_email"] != nil {
  752. tmp2["Nb_email"] = tmp["Nb_email"]
  753. }
  754. if tmp["Nb_tel"] != nil {
  755. tmp2["Nb_tel"] = tmp["Nb_tel"]
  756. }
  757. if tmp["Nb_addr"] != nil {
  758. tmp2["Nb_addr"] = tmp["Nb_addr"]
  759. }
  760. s_synopsis := tmp["s_synopsis"]
  761. if s_synopsis == nil {
  762. s_synopsis = ""
  763. }
  764. tmp2["s_synopsis"] = s_synopsis //企业简介
  765. //股东
  766. stock := getStock(tmp["investor"])
  767. tmp2["stock"] = stock
  768. tmp2["LegCerNO"] = tmp["LegCerNO"]
  769. if tmp["s_microwebsite"] != nil {
  770. tmp2["s_microwebsite"] = tmp["s_microwebsite"]
  771. }
  772. tmp2["SourceType"] = tmp["SourceType"] //数据来源
  773. s_servicenames := tmp["s_servicenames"]
  774. if s_servicenames == nil {
  775. s_servicenames = ""
  776. }
  777. tmp2["s_servicenames"] = s_servicenames //服务名称
  778. s_action := tmp["s_action"]
  779. if s_action == nil {
  780. s_action = "N"
  781. }
  782. tmp2["s_action"] = s_action
  783. tmp2["s_persion"] = tmp["s_persion"]
  784. tmp2["s_mobile"] = tmp["s_mobile"]
  785. tmp2["s_enturl"] = tmp["s_enturl"]
  786. tmp2["s_weixin"] = tmp["s_weixin"]
  787. tmp2["s_avatar"] = tmp["s_avatar"]
  788. return tmp2
  789. }
  790. func getStock(obj interface{}) string {
  791. stock := ""
  792. if ns, ok := obj.([]interface{}); ok {
  793. stock = " "
  794. for _, ns1 := range ns {
  795. if nn, ok1 := ns1.(map[string]interface{}); ok1 {
  796. tmp := fmt.Sprintf("%s", nn["Inv"])
  797. if strings.Index(stock, tmp) < 0 {
  798. stock += tmp + " "
  799. }
  800. }
  801. }
  802. }
  803. return stock
  804. }
  805. func BulkSave(index, itype string, obj *[]map[string]interface{}, isDelBefore bool) {
  806. client := GetEsConn()
  807. defer DestoryEsConn(client)
  808. if client != nil {
  809. defer func() {
  810. if r := recover(); r != nil {
  811. log.Println("[E]", r)
  812. for skip := 1; ; skip++ {
  813. _, file, line, ok := runtime.Caller(skip)
  814. if !ok {
  815. break
  816. }
  817. go log.Printf("%v,%v\n", file, line)
  818. }
  819. }
  820. }()
  821. req := client.Bulk()
  822. for _, v := range *obj {
  823. if v == nil || len(v) == 0 {
  824. continue
  825. }
  826. _id := util.BsonIdToSId(v["_id"])
  827. v["id"] = _id
  828. delete(v, "_id")
  829. if isDelBefore && _id != "" {
  830. req = req.Add(es.NewBulkDeleteRequest().Index(index).Type(itype).Id(_id))
  831. }
  832. req = req.Add(es.NewBulkIndexRequest().Index(index).Type(itype).Id(_id).Doc(v))
  833. }
  834. _, err := req.Do(context.TODO())
  835. if err != nil {
  836. log.Println("批量保存到ES出错", err.Error())
  837. }
  838. }
  839. }
  840. func Count(index, itype string, query interface{}) int64 {
  841. client := GetEsConn()
  842. defer DestoryEsConn(client)
  843. if client != nil {
  844. defer func() {
  845. if r := recover(); r != nil {
  846. log.Println("[E]", r)
  847. for skip := 1; ; skip++ {
  848. _, file, line, ok := runtime.Caller(skip)
  849. if !ok {
  850. break
  851. }
  852. go log.Printf("%v,%v\n", file, line)
  853. }
  854. }
  855. }()
  856. var qq es.Query
  857. if qs, ok := query.(string); ok {
  858. temp := &es.BoolQuery{
  859. QueryStrings: qs,
  860. }
  861. qq = temp
  862. } else if qi, ok2 := query.(es.Query); ok2 {
  863. qq = qi
  864. }
  865. n, err := client.Count(index).Type(itype).Query(qq).Do(context.TODO())
  866. if err != nil {
  867. log.Println("统计出错", err.Error())
  868. }
  869. return n
  870. }
  871. return 0
  872. }
  873. //ngram精确查询
  874. /*
  875. {
  876. "query": {
  877. "bool": {
  878. "should": [
  879. {
  880. "bool":{
  881. "must":[
  882. { "multi_match": {
  883. "query": "智能",
  884. "type": "phrase",
  885. "fields": [
  886. "title"
  887. ],
  888. "analyzer": "my_ngram"
  889. }
  890. },{
  891. "multi_match": {
  892. "query": "机器",
  893. "type": "phrase",
  894. "fields": [
  895. "title"
  896. ],
  897. "analyzer": "my_ngram"
  898. }
  899. },{
  900. "multi_match": {
  901. "query": "2016",
  902. "type": "phrase",
  903. "fields": [
  904. "title"
  905. ],
  906. "analyzer": "my_ngram"
  907. }
  908. }
  909. ]
  910. }
  911. },
  912. {
  913. "bool":{
  914. "must":[
  915. { "multi_match": {
  916. "query": "河南",
  917. "type": "phrase",
  918. "fields": [
  919. "title"
  920. ],
  921. "analyzer": "my_ngram"
  922. }
  923. },{
  924. "multi_match": {
  925. "query": "工商",
  926. "type": "phrase",
  927. "fields": [
  928. "title"
  929. ],
  930. "analyzer": "my_ngram"
  931. }
  932. },{
  933. "multi_match": {
  934. "query": "2016",
  935. "type": "phrase",
  936. "fields": [
  937. "title"
  938. ],
  939. "analyzer": "my_ngram"
  940. }
  941. }
  942. ]
  943. }
  944. }
  945. ],"minimum_should_match": 1
  946. }
  947. },
  948. "_source": [
  949. "_id",
  950. "title"
  951. ],
  952. "from": 0,
  953. "size": 10,
  954. "sort": [{
  955. "publishtime": "desc"
  956. }]
  957. }
  958. */
  959. //"2016+智能+办公,"河南+工商"
  960. //["2016+智能+办公","河南+工商"]
  961. //QStr = `{"query":{"bool":{should":[$or],"minimum_should_match" : 1}}}`
  962. //{"bool":{"must":[]}}
  963. //{"multi_match": {"query": "$word","type": "phrase", "fields": [$field],"analyzer": "my_ngram"}}
  964. //"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {"detail": {"fragment_size": 1,"number_of_fragments": 1},"title": {"fragment_size": 1,"number_of_fragments": 1}}}
  965. const (
  966. //此处最后少一个},正好NgramStr取[1:]多一个}
  967. FilterQuery = `{"query": {"bool": {"filter": [%s]}},%s`
  968. NgramStr = `{"query":{"bool":{"must":[%s],"should":[%s],"minimum_should_match" : 0}}}`
  969. NgramMust = `{"bool":{"must":[%s]}}`
  970. NgramMustAndNot = `{"bool":{"must":[%s],"must_not":[%s]}}`
  971. minq = `{"multi_match": {"query": "%s","type": "phrase", "fields": [%s],"analyzer": "my_ngram"}}`
  972. HL = `"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {%s}}`
  973. highlightStr = `%s: {"fragment_size": %d,"number_of_fragments": 1}`
  974. FilterQuery_New = `{"query":{"bool":{"must": [%s%s%s],"should":[]}}}`
  975. MatchQueryString = `{"match": {%s: { "query":"%s", "operator": "and"}}}`
  976. HL_New = `"highlight": {"pre_tags": ["<HL>"],"post_tags": ["<HL>"],"fields": {%s}}`
  977. )
  978. //替换了"号
  979. func GetNgramQuery(query interface{}, mustquery, findfields string) (qstr string) {
  980. var words []string
  981. if q, ok := query.(string); ok {
  982. if q != "" {
  983. words = strings.Split(q, ",")
  984. }
  985. } else if q, ok := query.([]string); ok {
  986. words = q
  987. } else if q, ok := query.([]interface{}); ok {
  988. words = util.ObjArrToStringArr(q)
  989. }
  990. if words != nil {
  991. new_minq := fmt.Sprintf(minq, "%s", findfields)
  992. musts := []string{}
  993. for _, qs_words := range words {
  994. qws := strings.Split(qs_words, "+")
  995. mq := []string{}
  996. for _, qs_word := range qws {
  997. mq = append(mq, fmt.Sprintf(new_minq, ReplaceYH(qs_word)))
  998. }
  999. musts = append(musts, fmt.Sprintf(NgramMust, strings.Join(mq, ",")))
  1000. }
  1001. qstr = fmt.Sprintf(NgramStr, mustquery, strings.Join(musts, ","))
  1002. //log.Println("ngram-query", qstr)
  1003. } else {
  1004. qstr = fmt.Sprintf(NgramStr, mustquery, "")
  1005. }
  1006. return
  1007. }
  1008. func GetNgramQuery_New(querystring, querymust interface{}, must, findfields string) (qstring string) {
  1009. querymust_string := ""
  1010. var wordsMust []string
  1011. if q, ok := querymust.(string); ok {
  1012. if q != "" {
  1013. wordsMust = strings.Split(q, ",")
  1014. }
  1015. } else if q, ok := querymust.([]string); ok {
  1016. wordsMust = q
  1017. } else if q, ok := querymust.([]interface{}); ok {
  1018. wordsMust = util.ObjArrToStringArr(q)
  1019. }
  1020. if wordsMust != nil {
  1021. new_minq := fmt.Sprintf(minq, "%s", findfields)
  1022. musts := []string{}
  1023. for _, qs_wordsMust := range wordsMust {
  1024. qws := strings.Split(qs_wordsMust, "+")
  1025. mq := []string{}
  1026. for _, qs_word := range qws {
  1027. mq = append(mq, fmt.Sprintf(new_minq, qs_word))
  1028. }
  1029. musts = append(musts, fmt.Sprintf(NgramMust, strings.Join(mq, ",")))
  1030. }
  1031. querymust_string = strings.Join(musts, ",")
  1032. }
  1033. //log.Println("must", must, querymust_string)
  1034. //querystring---------------------------------------------
  1035. query_string := ""
  1036. var querysShold []string
  1037. if q, ok := querystring.(string); ok {
  1038. if q != "" {
  1039. querysShold = strings.Split(q, ",")
  1040. }
  1041. } else if q, ok := querystring.([]string); ok {
  1042. querysShold = q
  1043. } else if q, ok := querystring.([]interface{}); ok {
  1044. querysShold = util.ObjArrToStringArr(q)
  1045. }
  1046. if querysShold != nil {
  1047. for k, name := range strings.Split(findfields, ",") {
  1048. for _, qs_querysShold := range querysShold {
  1049. if k > 0 {
  1050. query_string = query_string + "," + fmt.Sprintf(MatchQueryString, fmt.Sprint(name), qs_querysShold)
  1051. } else {
  1052. query_string = query_string + fmt.Sprintf(MatchQueryString, fmt.Sprint(name), qs_querysShold)
  1053. }
  1054. }
  1055. }
  1056. }
  1057. //log.Println("querystring", query_string)
  1058. if querymust_string == "" {
  1059. qstring = fmt.Sprintf(FilterQuery_New, must, query_string, querymust_string)
  1060. } else {
  1061. qstring = fmt.Sprintf(FilterQuery_New, must, query_string, ","+querymust_string)
  1062. }
  1063. return
  1064. }
  1065. func GetByNgram(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int) *[]map[string]interface{} {
  1066. return GetByNgramAll(index, itype, query, mustquery, findfields, order, fields, start, limit, false, false)
  1067. }
  1068. //增加高亮、过滤查询、高亮截取字数
  1069. func GetByNgramOther(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool, count int) *[]map[string]interface{} {
  1070. defer util.Catch()
  1071. qstr := ""
  1072. if mustquery != "" && filtermode {
  1073. qstr = GetNgramQuery(query, "", findfields)
  1074. qstr = fmt.Sprintf(FilterQuery, mustquery, qstr[1:])
  1075. } else {
  1076. qstr = GetNgramQuery(query, mustquery, findfields)
  1077. }
  1078. if qstr != "" {
  1079. if highlight {
  1080. ws := []string{}
  1081. for _, w := range strings.Split(findfields, ",") {
  1082. ws = append(ws, fmt.Sprintf(highlightStr, w, count))
  1083. }
  1084. qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
  1085. }
  1086. if len(fields) > 0 {
  1087. qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
  1088. }
  1089. if len(order) > 0 {
  1090. qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
  1091. }
  1092. if start > -1 {
  1093. qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
  1094. }
  1095. //log.Println("ngram-find", qstr)
  1096. return Get(index, itype, qstr)
  1097. } else {
  1098. return nil
  1099. }
  1100. }
  1101. //增加高亮、过滤查询
  1102. //替换了"号
  1103. func GetByNgramAll(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool) *[]map[string]interface{} {
  1104. defer util.Catch()
  1105. qstr := ""
  1106. if mustquery != "" && filtermode {
  1107. qstr = GetNgramQuery(query, "", findfields)
  1108. qstr = fmt.Sprintf(FilterQuery, mustquery, qstr[1:])
  1109. } else {
  1110. qstr = GetNgramQuery(query, mustquery, findfields)
  1111. }
  1112. if qstr != "" {
  1113. if highlight {
  1114. ws := []string{}
  1115. for _, w := range strings.Split(findfields, ",") {
  1116. ws = append(ws, fmt.Sprintf(highlightStr, w, 1))
  1117. }
  1118. qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
  1119. }
  1120. if len(fields) > 0 {
  1121. qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
  1122. }
  1123. if len(order) > 0 {
  1124. qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
  1125. }
  1126. if start > -1 {
  1127. qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
  1128. }
  1129. // log.Println("ngram-find", qstr)
  1130. return Get(index, itype, qstr)
  1131. } else {
  1132. return nil
  1133. }
  1134. }
  1135. //增加高亮、过滤查询
  1136. func GetByNgramAll_New(index, itype string, querystring, querymust interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool) *[]map[string]interface{} {
  1137. defer util.Catch()
  1138. qstr := ""
  1139. if filtermode {
  1140. qstr = GetNgramQuery_New(querystring, querymust, mustquery, findfields)
  1141. } else {
  1142. qstr = GetNgramQuery_New(querystring, "", mustquery, findfields)
  1143. }
  1144. if qstr != "" {
  1145. if highlight {
  1146. ws := []string{}
  1147. for _, w := range strings.Split(findfields, ",") {
  1148. ws = append(ws, w+`:{"force_source": true}`)
  1149. }
  1150. qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL_New, strings.Join(ws, ",")) + `}`
  1151. }
  1152. if len(fields) > 0 {
  1153. qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
  1154. }
  1155. if len(order) > 0 {
  1156. qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", ",", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
  1157. }
  1158. if start > -1 {
  1159. qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
  1160. }
  1161. //log.Println("ngram-find", order, qstr)
  1162. return Get(index, itype, qstr)
  1163. } else {
  1164. return nil
  1165. }
  1166. }
  1167. type KeyConfig struct {
  1168. Keys []string `json:"key"`
  1169. NotKeys []string `json:"notkey"`
  1170. InfoTypes []string `json:"infotype"`
  1171. Areas []string `json:"area"`
  1172. }
  1173. //替换了"号
  1174. func GetResForJY(index, itype string, keys []KeyConfig, allquery, findfields, SortQuery, fields string, start, limit int) *[]map[string]interface{} {
  1175. if len(keys) > 0 {
  1176. qstr := ""
  1177. new_minq := fmt.Sprintf(minq, "%s", findfields)
  1178. not_new_minq := fmt.Sprintf(minq, "%s", `"title"`) //排除词只查询标题
  1179. musts := []string{}
  1180. for _, qs_words := range keys {
  1181. mq := []string{}
  1182. notmq := []string{}
  1183. for _, qs_word := range qs_words.Keys {
  1184. mq = append(mq, fmt.Sprintf(new_minq, ReplaceYH(qs_word)))
  1185. }
  1186. for _, qs_word := range qs_words.NotKeys {
  1187. notmq = append(notmq, fmt.Sprintf(not_new_minq, ReplaceYH(qs_word)))
  1188. }
  1189. if len(qs_words.Areas) > 0 {
  1190. mq = append(mq, fmt.Sprintf(`{"terms":{"area":["%s"]}}`, strings.Join(qs_words.Areas, `","`)))
  1191. }
  1192. if len(qs_words.InfoTypes) > 0 {
  1193. mq = append(mq, fmt.Sprintf(`{"terms":{"toptype":["%s"]}}`, strings.Join(qs_words.InfoTypes, `","`)))
  1194. }
  1195. musts = append(musts, fmt.Sprintf(NgramMustAndNot, strings.Join(mq, ","), strings.Join(notmq, ",")))
  1196. }
  1197. qstr = fmt.Sprintf(NgramStr, "", strings.Join(musts, ","))
  1198. qstr = fmt.Sprintf(FilterQuery, allquery, qstr[1:])
  1199. ws := []string{}
  1200. for _, w := range strings.Split(findfields, ",") {
  1201. ws = append(ws, fmt.Sprintf(highlightStr, w, 1))
  1202. }
  1203. qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
  1204. if len(fields) > 0 {
  1205. qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
  1206. }
  1207. if len(SortQuery) > 0 {
  1208. qstr = qstr[:len(qstr)-1] + `,"sort":` + SortQuery + `}`
  1209. }
  1210. if start > -1 {
  1211. qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
  1212. }
  1213. //log.Println("jy-ngram-find", qstr)
  1214. return Get(index, itype, qstr)
  1215. } else {
  1216. return nil
  1217. }
  1218. }
  1219. func ReplaceYH(src string) (rpl string) {
  1220. return strings.Replace(src, `"`, `\"`, -1)
  1221. }
  1222. //
  1223. func GetAllByNgram(index, itype, qstr, findfields, order, fields string, start, limit, count int, highlight bool) *[]map[string]interface{} {
  1224. if qstr != "" {
  1225. if highlight {
  1226. ws := []string{}
  1227. for _, w := range strings.Split(findfields, ",") {
  1228. ws = append(ws, fmt.Sprintf(highlightStr, w, count))
  1229. }
  1230. qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
  1231. }
  1232. if len(fields) > 0 {
  1233. qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
  1234. }
  1235. if len(order) > 0 {
  1236. qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
  1237. }
  1238. if start > -1 {
  1239. qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
  1240. }
  1241. //log.Println("GetAllByNgram:", qstr)
  1242. return Get(index, itype, qstr)
  1243. } else {
  1244. return nil
  1245. }
  1246. }