logs.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package entity
  2. import (
  3. "app.yhyue.com/moapp/jybase/mongodb"
  4. "fmt"
  5. "log"
  6. "time"
  7. )
  8. type SaveLogs struct {
  9. Name string // 日志名称
  10. CollName string // 保存的coll
  11. MgoSaveCacheSize int // 缓存通道大小
  12. SPSize int // 数据库并发数据
  13. MgoSaveCache chan map[string]interface{}
  14. SP chan bool
  15. MgoSave mongodb.MongodbSim //数据保存库连接
  16. BulkSize int // 每批的数量
  17. TimeAfter int // 定时保存
  18. Timeout int // 缓存通道满时 超时丢弃
  19. }
  20. func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs {
  21. sl := SaveLogs{
  22. Name: name,
  23. CollName: saveColl,
  24. MgoSaveCacheSize: mgoSaveCacheSize,
  25. SPSize: sPSize,
  26. MgoSave: mgoSave,
  27. BulkSize: bulkSize,
  28. TimeAfter: timeAfter,
  29. Timeout: timeout,
  30. }
  31. // 初始化
  32. sl.SP = make(chan bool, sl.SPSize)
  33. sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize)
  34. return &sl
  35. }
  36. // SendLogs 往通道发送数据
  37. func (s *SaveLogs) SendLogs(data map[string]interface{}) {
  38. timer := time.NewTicker(time.Duration(s.Timeout) * time.Millisecond)
  39. defer timer.Stop()
  40. select {
  41. case s.MgoSaveCache <- data:
  42. case <-timer.C:
  43. log.Println("缓存通道已满,丢弃:", data)
  44. return
  45. }
  46. }
  47. // SaveMgo 数据存库
  48. func (s *SaveLogs) SaveMgo() {
  49. log.Println(fmt.Sprintf("%s Save...", s.Name))
  50. arr := make([]map[string]interface{}, s.BulkSize)
  51. index := 0
  52. timer := time.NewTicker(time.Duration(s.TimeAfter) * time.Millisecond)
  53. defer timer.Stop()
  54. for {
  55. select {
  56. case v := <-s.MgoSaveCache:
  57. arr[index] = v
  58. index++
  59. if index == s.BulkSize {
  60. s.SP <- true
  61. go func(arru []map[string]interface{}) {
  62. defer func() {
  63. <-s.SP
  64. }()
  65. s.MgoSave.SaveBulk(s.CollName, arru...)
  66. }(arr)
  67. arr = make([]map[string]interface{}, s.BulkSize)
  68. index = 0
  69. }
  70. case <-timer.C:
  71. if index > 0 {
  72. s.SP <- true
  73. go func(arru []map[string]interface{}) {
  74. defer func() {
  75. <-s.SP
  76. }()
  77. s.MgoSave.SaveBulk(s.CollName, arru...)
  78. }(arr[:index])
  79. arr = make([]map[string]interface{}, s.BulkSize)
  80. index = 0
  81. }
  82. }
  83. }
  84. }