updateMethod.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package main
  2. import (
  3. "log"
  4. "time"
  5. )
  6. var sp = make(chan bool, 5)
  7. type updateInfo struct { //更新或新增通道
  8. updatePool chan []map[string]interface{}
  9. saveSize int
  10. }
  11. func newUpdatePool() *updateInfo {
  12. update := &updateInfo{make(chan []map[string]interface{}, 50000), 200}
  13. return update
  14. }
  15. // 临时~新增组
  16. type addGroupInfo struct {
  17. pool chan map[string]interface{}
  18. saveSize int
  19. }
  20. func newAddGroupPool() *addGroupInfo {
  21. info := &addGroupInfo{make(chan map[string]interface{}, 50000), 200}
  22. return info
  23. }
  24. // 监听更新
  25. func (update *updateInfo) updateData() {
  26. log.Println("开始不断监听--待更新数据")
  27. tmpArr := make([][]map[string]interface{}, update.saveSize)
  28. tmpIndex := 0
  29. for {
  30. select {
  31. case value := <-update.updatePool:
  32. tmpArr[tmpIndex] = value
  33. tmpIndex++
  34. if tmpIndex == update.saveSize {
  35. sp <- true
  36. go func(dataArr [][]map[string]interface{}) {
  37. defer func() {
  38. <-sp
  39. }()
  40. data_mgo.UpSertBulk(extract, dataArr...)
  41. }(tmpArr)
  42. tmpArr = make([][]map[string]interface{}, update.saveSize)
  43. tmpIndex = 0
  44. }
  45. case <-time.After(5 * time.Second): //无反应时每x秒检测一次
  46. if tmpIndex > 0 {
  47. sp <- true
  48. go func(dataArr [][]map[string]interface{}) {
  49. defer func() {
  50. <-sp
  51. }()
  52. data_mgo.UpSertBulk(extract, dataArr...)
  53. }(tmpArr[:tmpIndex])
  54. tmpArr = make([][]map[string]interface{}, update.saveSize)
  55. tmpIndex = 0
  56. }
  57. }
  58. }
  59. }
  60. // 监听新增
  61. func (info *addGroupInfo) addGroupData() {
  62. tmpArr := make([]map[string]interface{}, info.saveSize)
  63. tmpIndex := 0
  64. for {
  65. select {
  66. case value := <-info.pool:
  67. tmpArr[tmpIndex] = value
  68. tmpIndex++
  69. if tmpIndex == info.saveSize {
  70. sp <- true
  71. go func(dataArr []map[string]interface{}) {
  72. defer func() {
  73. <-sp
  74. }()
  75. data_mgo.SaveBulk("zktes_full_repeat", dataArr...)
  76. }(tmpArr)
  77. tmpArr = make([]map[string]interface{}, info.saveSize)
  78. tmpIndex = 0
  79. }
  80. case <-time.After(7 * time.Second): //无反应时每x秒检测一次
  81. if tmpIndex > 0 {
  82. sp <- true
  83. go func(dataArr []map[string]interface{}) {
  84. defer func() {
  85. <-sp
  86. }()
  87. data_mgo.SaveBulk("zktes_full_repeat", dataArr...)
  88. }(tmpArr[:tmpIndex])
  89. tmpArr = make([]map[string]interface{}, info.saveSize)
  90. tmpIndex = 0
  91. }
  92. }
  93. }
  94. }