logs.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package util
  2. import (
  3. "app.yhyue.com/moapp/jybase/mongodb"
  4. "app.yhyue.com/moapp/message/config"
  5. "fmt"
  6. "log"
  7. "time"
  8. )
  9. var NsqLog *SaveLogs
  10. func LogInit(sl *SaveLogs, c config.SaveLogConfig, mgo mongodb.MongodbSim) {
  11. sl = NewSaveLog(c.Name, c.CollName, c.MgoSaveCacheSize, c.SPSize, c.BulkSize, c.TimeAfter, c.Timeout, mgo)
  12. go sl.SaveMgo()
  13. }
  14. type SaveLogs struct {
  15. Name string // 日志名称
  16. CollName string // 保存的coll
  17. MgoSaveCacheSize int // 缓存通道大小
  18. SPSize int // 数据库并发数据
  19. MgoSaveCache chan map[string]interface{}
  20. SP chan bool
  21. MgoSave mongodb.MongodbSim //数据保存库连接
  22. BulkSize int // 每批的数量
  23. TimeAfter int // 定时保存
  24. Timeout int // 缓存通道满时 超时丢弃
  25. }
  26. func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs {
  27. sl := SaveLogs{
  28. Name: name,
  29. CollName: saveColl,
  30. MgoSaveCacheSize: mgoSaveCacheSize,
  31. SPSize: sPSize,
  32. MgoSave: mgoSave,
  33. BulkSize: bulkSize,
  34. TimeAfter: timeAfter,
  35. Timeout: timeout,
  36. }
  37. // 初始化
  38. sl.SP = make(chan bool, sl.SPSize)
  39. sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize)
  40. return &sl
  41. }
  42. // SendLogs 往通道发送数据
  43. func (s *SaveLogs) SendLogs(data map[string]interface{}) {
  44. timer := time.NewTimer(time.Duration(s.Timeout) * time.Millisecond)
  45. defer timer.Stop()
  46. select {
  47. case s.MgoSaveCache <- data:
  48. case <-timer.C:
  49. log.Println("缓存通道已满,丢弃:", s.Name, data)
  50. return
  51. }
  52. }
  53. // SaveMgo 数据存库
  54. func (s *SaveLogs) SaveMgo() {
  55. log.Println(fmt.Sprintf("%s Save...", s.Name))
  56. arr := make([]map[string]interface{}, s.BulkSize)
  57. index := 0
  58. timer := time.NewTimer(time.Duration(s.TimeAfter) * time.Millisecond)
  59. defer timer.Stop()
  60. for {
  61. timer.Reset(time.Duration(s.TimeAfter) * time.Millisecond)
  62. select {
  63. case v := <-s.MgoSaveCache:
  64. arr[index] = v
  65. index++
  66. if index == s.BulkSize {
  67. s.SP <- true
  68. go func(arru []map[string]interface{}) {
  69. defer func() {
  70. <-s.SP
  71. }()
  72. s.MgoSave.SaveBulk(s.CollName, arru...)
  73. }(arr)
  74. arr = make([]map[string]interface{}, s.BulkSize)
  75. index = 0
  76. }
  77. case <-timer.C:
  78. if index > 0 {
  79. s.SP <- true
  80. go func(arru []map[string]interface{}) {
  81. defer func() {
  82. <-s.SP
  83. }()
  84. s.MgoSave.SaveBulk(s.CollName, arru...)
  85. }(arr[:index])
  86. arr = make([]map[string]interface{}, s.BulkSize)
  87. index = 0
  88. }
  89. }
  90. }
  91. }