elasticSim.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. package elastic
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. es "gopkg.in/olivere/elastic.v2"
  6. "log"
  7. "runtime"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. type Elastic struct {
  13. S_esurl string
  14. I_size int
  15. Addrs []string
  16. Pool chan *es.Client
  17. lastTime int64
  18. lastTimeLock sync.Mutex
  19. ntimeout int
  20. }
  21. func (e *Elastic) InitElasticSize() {
  22. e.Pool = make(chan *es.Client, e.I_size)
  23. for _, s := range strings.Split(e.S_esurl, ",") {
  24. e.Addrs = append(e.Addrs, s)
  25. }
  26. for i := 0; i < e.I_size; i++ {
  27. client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  28. e.Pool <- client
  29. }
  30. }
  31. //关闭连接
  32. func (e *Elastic) DestoryEsConn(client *es.Client) {
  33. select {
  34. case e.Pool <- client:
  35. break
  36. case <-time.After(time.Second * 1):
  37. if client != nil {
  38. client.Stop()
  39. }
  40. client = nil
  41. }
  42. }
  43. func (e *Elastic) GetEsConn() *es.Client {
  44. select {
  45. case c := <-e.Pool:
  46. if c == nil || !c.IsRunning() {
  47. log.Println("new esclient.", len(e.Pool))
  48. client, err := es.NewClient(es.SetURL(e.Addrs...),
  49. es.SetMaxRetries(2), es.SetSniff(false))
  50. if err == nil && client.IsRunning() {
  51. return client
  52. }
  53. }
  54. return c
  55. case <-time.After(time.Second * 4):
  56. //超时
  57. e.ntimeout++
  58. e.lastTimeLock.Lock()
  59. defer e.lastTimeLock.Unlock()
  60. //12秒后允许创建链接
  61. c := time.Now().Unix() - e.lastTime
  62. if c > 12 {
  63. e.lastTime = time.Now().Unix()
  64. log.Println("add client..", len(e.Pool))
  65. c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  66. go func() {
  67. for i := 0; i < 2; i++ {
  68. client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetMaxRetries(2), es.SetSniff(false))
  69. e.Pool <- client
  70. }
  71. }()
  72. return c
  73. }
  74. return nil
  75. }
  76. }
  77. func (e *Elastic) Get(index, itype, query string) *[]map[string]interface{} {
  78. client := e.GetEsConn()
  79. defer func() {
  80. go e.DestoryEsConn(client)
  81. }()
  82. var res []map[string]interface{}
  83. if client != nil {
  84. defer func() {
  85. if r := recover(); r != nil {
  86. log.Println("[E]", r)
  87. for skip := 1; ; skip++ {
  88. _, file, line, ok := runtime.Caller(skip)
  89. if !ok {
  90. break
  91. }
  92. go log.Printf("%v,%v\n", file, line)
  93. }
  94. }
  95. }()
  96. searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
  97. if err != nil {
  98. log.Println("从ES查询出错", err.Error())
  99. return nil
  100. }
  101. if searchResult.Hits != nil {
  102. resNum := len(searchResult.Hits.Hits)
  103. if resNum < 5000 {
  104. res = make([]map[string]interface{}, resNum)
  105. for i, hit := range searchResult.Hits.Hits {
  106. parseErr := json.Unmarshal(*hit.Source, &res[i])
  107. if parseErr == nil && hit.Highlight != nil && res[i] != nil {
  108. res[i]["highlight"] = map[string][]string(hit.Highlight)
  109. }
  110. }
  111. } else {
  112. log.Println("查询结果太多,查询到:", resNum, "条")
  113. }
  114. }
  115. }
  116. return &res
  117. }
  118. //关闭elastic
  119. func (e *Elastic) Close() {
  120. for i := 0; i < e.I_size; i++ {
  121. cli := <-e.Pool
  122. cli.Stop()
  123. cli = nil
  124. }
  125. e.Pool = nil
  126. e = nil
  127. }
  128. //获取连接
  129. //func (e *Elastic) GetEsConn() (c *es.Client) {
  130. // defer util.Catch()
  131. // select {
  132. // case c = <-e.Pool:
  133. // if c == nil || !c.IsRunning() {
  134. // client, err := es.NewClient(es.SetURL(addrs...),
  135. // es.SetMaxRetries(2), es.SetSniff(false))
  136. // if err == nil && client.IsRunning() {
  137. // return client
  138. // }
  139. // return nil
  140. // }
  141. // return
  142. // case <-time.After(time.Second * 7):
  143. // //超时
  144. // ntimeout++
  145. // log.Println("timeout times:", ntimeout)
  146. // return nil
  147. // }
  148. //}
  149. func (e *Elastic) BulkSave(index, itype string, obj *[]map[string]interface{}, isDelBefore bool) {
  150. client := e.GetEsConn()
  151. defer e.DestoryEsConn(client)
  152. if client != nil {
  153. req := client.Bulk()
  154. for _, v := range *obj {
  155. if isDelBefore {
  156. req = req.Add(es.NewBulkDeleteRequest().Index(index).Type(itype).Id(fmt.Sprintf("%v", v["_id"])))
  157. }
  158. req = req.Add(es.NewBulkIndexRequest().Index(index).Type(itype).Doc(v))
  159. }
  160. _, err := req.Do()
  161. if err != nil {
  162. log.Println("批量保存到ES出错", err.Error())
  163. }
  164. }
  165. }
  166. //根据id删除索引对象
  167. func (e *Elastic) DelById(index, itype, id string) bool {
  168. client := e.GetEsConn()
  169. defer e.DestoryEsConn(client)
  170. b := false
  171. if client != nil {
  172. var err error
  173. _, err = client.Delete().Index(index).Type(itype).Id(id).Do()
  174. if err != nil {
  175. log.Println("更新检索出错:", err.Error())
  176. } else {
  177. b = true
  178. }
  179. }
  180. return b
  181. }
  182. func (e *Elastic) GetNoLimit(index, itype, query string) *[]map[string]interface{} {
  183. //log.Println("query -- ", query)
  184. client := e.GetEsConn()
  185. defer e.DestoryEsConn(client)
  186. var res []map[string]interface{}
  187. if client != nil {
  188. defer func() {
  189. if r := recover(); r != nil {
  190. log.Println("[E]", r)
  191. for skip := 1; ; skip++ {
  192. _, file, line, ok := runtime.Caller(skip)
  193. if !ok {
  194. break
  195. }
  196. go log.Printf("%v,%v\n", file, line)
  197. }
  198. }
  199. }()
  200. searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
  201. if err != nil {
  202. log.Println("从ES查询出错", err.Error())
  203. return nil
  204. }
  205. if searchResult.Hits != nil {
  206. resNum := len(searchResult.Hits.Hits)
  207. res = make([]map[string]interface{}, resNum)
  208. for i, hit := range searchResult.Hits.Hits {
  209. json.Unmarshal(*hit.Source, &res[i])
  210. }
  211. }
  212. }
  213. return &res
  214. }
  215. func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
  216. client := e.GetEsConn()
  217. defer e.DestoryEsConn(client)
  218. if client != nil {
  219. defer func() {
  220. if r := recover(); r != nil {
  221. log.Println("[E]", r)
  222. for skip := 1; ; skip++ {
  223. _, file, line, ok := runtime.Caller(skip)
  224. if !ok {
  225. break
  226. }
  227. go log.Printf("%v,%v\n", file, line)
  228. }
  229. }
  230. }()
  231. query := `{"query":{"term":{"_id":"` + id + `"}}`
  232. if len(fields) > 0 {
  233. query = query + `,"_source":[` + fields + `]`
  234. }
  235. query = query + "}"
  236. searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
  237. if err != nil {
  238. log.Println("从ES查询出错", err.Error())
  239. return nil
  240. }
  241. var res map[string]interface{}
  242. if searchResult.Hits != nil {
  243. resNum := len(searchResult.Hits.Hits)
  244. if resNum == 1 {
  245. res = make(map[string]interface{})
  246. for _, hit := range searchResult.Hits.Hits {
  247. json.Unmarshal(*hit.Source, &res)
  248. }
  249. return &res
  250. }
  251. }
  252. }
  253. return nil
  254. }
  255. func (e *Elastic) Count(index, itype string, query interface{}) int64 {
  256. client := e.GetEsConn()
  257. defer e.DestoryEsConn(client)
  258. if client != nil {
  259. defer func() {
  260. if r := recover(); r != nil {
  261. log.Println("[E]", r)
  262. for skip := 1; ; skip++ {
  263. _, file, line, ok := runtime.Caller(skip)
  264. if !ok {
  265. break
  266. }
  267. go log.Printf("%v,%v\n", file, line)
  268. }
  269. }
  270. }()
  271. var qq es.Query
  272. if qi, ok2 := query.(es.Query); ok2 {
  273. qq = qi
  274. }
  275. n, err := client.Count(index).Type(itype).Query(qq).Do()
  276. if err != nil {
  277. log.Println("统计出错", err.Error())
  278. }
  279. return n
  280. }
  281. return 0
  282. }
  283. //更新一个字段
  284. func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
  285. client := e.GetEsConn()
  286. defer e.DestoryEsConn(client)
  287. if client != nil {
  288. defer func() {
  289. if r := recover(); r != nil {
  290. log.Println("[E]", r)
  291. for skip := 1; ; skip++ {
  292. _, file, line, ok := runtime.Caller(skip)
  293. if !ok {
  294. break
  295. }
  296. go log.Printf("%v,%v\n", file, line)
  297. }
  298. }
  299. }()
  300. for _, data := range update {
  301. id := data["id"]
  302. updateStr := data["updateStr"]
  303. if id != "" && updateStr != "" {
  304. _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
  305. if err != nil {
  306. log.Println("更新检索出错:", err.Error())
  307. }
  308. } else {
  309. log.Println("数据错误")
  310. }
  311. }
  312. }
  313. }
  314. //更新多个字段
  315. func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
  316. client := e.GetEsConn()
  317. defer e.DestoryEsConn(client)
  318. if client != nil {
  319. defer func() {
  320. if r := recover(); r != nil {
  321. log.Println("[E]", r)
  322. for skip := 1; ; skip++ {
  323. _, file, line, ok := runtime.Caller(skip)
  324. if !ok {
  325. break
  326. }
  327. go log.Printf("%v,%v\n", file, line)
  328. }
  329. }
  330. }()
  331. for _, arr := range arrs {
  332. id := arr[0]["id"].(string)
  333. update := arr[1]["update"].([]string)
  334. for _, str := range update {
  335. _, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
  336. if err != nil {
  337. log.Println("更新检索出错:", err.Error())
  338. }
  339. }
  340. }
  341. }
  342. }