logs.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package util
  2. import (
  3. "app.yhyue.com/moapp/jybase/mongodb"
  4. "fmt"
  5. "log"
  6. "time"
  7. )
  8. var NsqLog *SaveLogs
  9. type SaveLogs struct {
  10. Name string // 日志名称
  11. CollName string // 保存的coll
  12. MgoSaveCacheSize int // 缓存通道大小
  13. SPSize int // 数据库并发数据
  14. MgoSaveCache chan map[string]interface{}
  15. SP chan bool
  16. MgoSave mongodb.MongodbSim //数据保存库连接
  17. BulkSize int // 每批的数量
  18. TimeAfter int // 定时保存
  19. Timeout int // 缓存通道满时 超时丢弃
  20. }
  21. func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs {
  22. sl := SaveLogs{
  23. Name: name,
  24. CollName: saveColl,
  25. MgoSaveCacheSize: mgoSaveCacheSize,
  26. SPSize: sPSize,
  27. MgoSave: mgoSave,
  28. BulkSize: bulkSize,
  29. TimeAfter: timeAfter,
  30. Timeout: timeout,
  31. }
  32. // 初始化
  33. sl.SP = make(chan bool, sl.SPSize)
  34. sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize)
  35. return &sl
  36. }
  37. // SendLogs 往通道发送数据
  38. func (s *SaveLogs) SendLogs(data map[string]interface{}) {
  39. timer := time.NewTicker(time.Duration(s.Timeout) * time.Millisecond)
  40. defer timer.Stop()
  41. select {
  42. case s.MgoSaveCache <- data:
  43. case <-timer.C:
  44. log.Println("缓存通道已满,丢弃:", s.Name, data)
  45. return
  46. }
  47. }
  48. // SaveMgo 数据存库
  49. func (s *SaveLogs) SaveMgo() {
  50. log.Println(fmt.Sprintf("%s Save...", s.Name))
  51. arr := make([]map[string]interface{}, s.BulkSize)
  52. index := 0
  53. timer := time.NewTicker(time.Duration(s.TimeAfter) * time.Millisecond)
  54. defer timer.Stop()
  55. for {
  56. select {
  57. case v := <-s.MgoSaveCache:
  58. arr[index] = v
  59. index++
  60. if index == s.BulkSize {
  61. s.SP <- true
  62. go func(arru []map[string]interface{}) {
  63. defer func() {
  64. <-s.SP
  65. }()
  66. s.MgoSave.SaveBulk(s.CollName, arru...)
  67. }(arr)
  68. arr = make([]map[string]interface{}, s.BulkSize)
  69. index = 0
  70. }
  71. case <-timer.C:
  72. if index > 0 {
  73. s.SP <- true
  74. go func(arru []map[string]interface{}) {
  75. defer func() {
  76. <-s.SP
  77. }()
  78. s.MgoSave.SaveBulk(s.CollName, arru...)
  79. }(arr[:index])
  80. arr = make([]map[string]interface{}, s.BulkSize)
  81. index = 0
  82. }
  83. }
  84. }
  85. }