updateRecord.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package main
  2. import (
  3. "log"
  4. "time"
  5. )
  6. type updateRecordInfo struct {
  7. //新增通道
  8. add_pool chan map[string]interface{}
  9. //更新通道
  10. update_pool chan []map[string]interface{}
  11. //数量
  12. saveSize int
  13. }
  14. var sp_r = make(chan bool,10)
  15. //批量更新对象
  16. func newUpdateRecordPool() *updateRecordInfo {
  17. update:=&updateRecordInfo{nil,make(chan []map[string]interface{}, 50000),200}
  18. return update
  19. }
  20. //批量新增对象
  21. func newAddRecordPool() *updateRecordInfo {
  22. update:=&updateRecordInfo{make(chan map[string]interface{}, 50000),nil,200}
  23. return update
  24. }
  25. //新增池
  26. func (update *updateRecordInfo) addRecordData() {
  27. log.Println("监听日志......新增数据")
  28. tmpArr := make([]map[string]interface{}, update.saveSize)
  29. tmpIndex := 0
  30. for {
  31. select {
  32. case value := <-update.add_pool:
  33. tmpArr[tmpIndex] = value
  34. tmpIndex++
  35. if tmpIndex == update.saveSize {
  36. sp_r <- true
  37. go func(dataArr []map[string]interface{}) {
  38. defer func() {
  39. <-sp_r
  40. }()
  41. //批量新增
  42. mgo.SaveBulk(record_coll_name, dataArr...)
  43. }(tmpArr)
  44. tmpArr = make([]map[string]interface{}, update.saveSize)
  45. tmpIndex = 0
  46. }
  47. case <-time.After(10 * time.Second)://无反应时每x秒检测一次
  48. //log.Println("10秒检测",tmpIndex)
  49. if tmpIndex > 0 {
  50. sp_r <- true
  51. go func(dataArr []map[string]interface{}) {
  52. defer func() {
  53. <-sp_r
  54. }()
  55. //批量新增
  56. mgo.SaveBulk(record_coll_name, dataArr...)
  57. }(tmpArr[:tmpIndex])
  58. tmpArr = make([]map[string]interface{}, update.saveSize)
  59. tmpIndex = 0
  60. }
  61. }
  62. }
  63. }
  64. //更新池
  65. func (update *updateRecordInfo) updateRecordData() {
  66. log.Println("监听日志......更新数据")
  67. tmpArr := make([][]map[string]interface{}, update.saveSize)
  68. tmpIndex := 0
  69. for {
  70. select {
  71. case value := <-update.update_pool:
  72. tmpArr[tmpIndex] = value
  73. tmpIndex++
  74. if tmpIndex == update.saveSize {
  75. sp_r <- true
  76. go func(dataArr [][]map[string]interface{}) {
  77. defer func() {
  78. <-sp_r
  79. }()
  80. //批量更新
  81. mgo.UpSertBulk(record_coll_name, dataArr...)
  82. }(tmpArr)
  83. tmpArr = make([][]map[string]interface{}, update.saveSize)
  84. tmpIndex = 0
  85. }
  86. case <-time.After(10 * time.Second)://无反应时每x秒检测一次
  87. if tmpIndex > 0 {
  88. sp_r <- true
  89. go func(dataArr [][]map[string]interface{}) {
  90. defer func() {
  91. <-sp_r
  92. }()
  93. //批量更新
  94. mgo.UpSertBulk(record_coll_name, dataArr...)
  95. }(tmpArr[:tmpIndex])
  96. tmpArr = make([][]map[string]interface{}, update.saveSize)
  97. tmpIndex = 0
  98. }
  99. }
  100. }
  101. }