updateElastic.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package main
  2. import (
  3. "qfw/util/elastic"
  4. "log"
  5. "time"
  6. )
  7. type updateEsInfo struct {
  8. //更新或新增通道
  9. update_pool chan map[string]string
  10. //数量
  11. saveSize int
  12. }
  13. var sp_es = make(chan bool, 10)
  14. //批量更新对象
  15. func newUpdateEsPool() *updateEsInfo {
  16. update:=&updateEsInfo{make(chan map[string]string, 50000),200}
  17. return update
  18. }
  19. //更新池
  20. func (update *updateEsInfo) updateEsData() {
  21. log.Println("监听Es......更新数据")
  22. tmpArr := make([]map[string]string, update.saveSize)
  23. tmpIndex := 0
  24. for {
  25. select {
  26. case value := <-update.update_pool:
  27. tmpArr[tmpIndex] = value
  28. tmpIndex++
  29. if tmpIndex == update.saveSize {
  30. sp_es <- true
  31. go func(dataArr []map[string]string) {
  32. defer func() {
  33. <-sp_es
  34. }()
  35. //批量更新
  36. elastic.BulkUpdateArr(esIndex,esType,dataArr)
  37. }(tmpArr)
  38. tmpArr = make([]map[string]string, update.saveSize)
  39. tmpIndex = 0
  40. }
  41. case <-time.After(10 * time.Second)://无反应时每x秒检测一次
  42. if tmpIndex > 0 {
  43. sp_es <- true
  44. go func(dataArr []map[string]string) {
  45. defer func() {
  46. <-sp_es
  47. }()
  48. //批量更新
  49. elastic.BulkUpdateArr(esIndex,esType,dataArr)
  50. }(tmpArr[:tmpIndex])
  51. tmpArr = make([]map[string]string, update.saveSize)
  52. tmpIndex = 0
  53. }
  54. }
  55. }
  56. }