logs.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package util
  2. import (
  3. "app.yhyue.com/moapp/jybase/mongodb"
  4. "app.yhyue.com/moapp/message/config"
  5. "fmt"
  6. "log"
  7. "time"
  8. )
  9. var NsqLog *SaveLogs
  10. func LogInit(sl *SaveLogs, c config.SaveLogConfig, mgo mongodb.MongodbSim) {
  11. sl = NewSaveLog(c.Name, c.CollName, c.MgoSaveCacheSize, c.SPSize, c.BulkSize, c.TimeAfter, c.Timeout, mgo)
  12. go sl.SaveMgo()
  13. }
  14. type SaveLogs struct {
  15. Name string // 日志名称
  16. CollName string // 保存的coll
  17. MgoSaveCacheSize int // 缓存通道大小
  18. SPSize int // 数据库并发数据
  19. MgoSaveCache chan map[string]interface{}
  20. SP chan bool
  21. MgoSave mongodb.MongodbSim //数据保存库连接
  22. BulkSize int // 每批的数量
  23. TimeAfter int // 定时保存
  24. Timeout int // 缓存通道满时 超时丢弃
  25. }
  26. func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs {
  27. sl := SaveLogs{
  28. Name: name,
  29. CollName: saveColl,
  30. MgoSaveCacheSize: mgoSaveCacheSize,
  31. SPSize: sPSize,
  32. MgoSave: mgoSave,
  33. BulkSize: bulkSize,
  34. TimeAfter: timeAfter,
  35. Timeout: timeout,
  36. }
  37. // 初始化
  38. sl.SP = make(chan bool, sl.SPSize)
  39. sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize)
  40. return &sl
  41. }
  42. // SendLogs 往通道发送数据
  43. func (s *SaveLogs) SendLogs(data map[string]interface{}) {
  44. timer := time.NewTicker(time.Duration(s.Timeout) * time.Millisecond)
  45. defer timer.Stop()
  46. select {
  47. case s.MgoSaveCache <- data:
  48. case <-timer.C:
  49. log.Println("缓存通道已满,丢弃:", s.Name, data)
  50. return
  51. }
  52. }
  53. // SaveMgo 数据存库
  54. func (s *SaveLogs) SaveMgo() {
  55. log.Println(fmt.Sprintf("%s Save...", s.Name))
  56. arr := make([]map[string]interface{}, s.BulkSize)
  57. index := 0
  58. timer := time.NewTicker(time.Duration(s.TimeAfter) * time.Millisecond)
  59. defer timer.Stop()
  60. for {
  61. select {
  62. case v := <-s.MgoSaveCache:
  63. arr[index] = v
  64. index++
  65. if index == s.BulkSize {
  66. s.SP <- true
  67. go func(arru []map[string]interface{}) {
  68. defer func() {
  69. <-s.SP
  70. }()
  71. s.MgoSave.SaveBulk(s.CollName, arru...)
  72. }(arr)
  73. arr = make([]map[string]interface{}, s.BulkSize)
  74. index = 0
  75. }
  76. case <-timer.C:
  77. if index > 0 {
  78. s.SP <- true
  79. go func(arru []map[string]interface{}) {
  80. defer func() {
  81. <-s.SP
  82. }()
  83. s.MgoSave.SaveBulk(s.CollName, arru...)
  84. }(arr[:index])
  85. arr = make([]map[string]interface{}, s.BulkSize)
  86. index = 0
  87. }
  88. }
  89. }
  90. }