logs.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package entity
  2. import (
  3. "app.yhyue.com/moapp/jybase/mongodb"
  4. "fmt"
  5. "log"
  6. "time"
  7. )
  8. type SaveLogs struct {
  9. Name string // 日志名称
  10. CollName string // 保存的coll
  11. MgoSaveCacheSize int // 缓存通道大小
  12. SPSize int // 数据库并发数据
  13. MgoSaveCache chan map[string]interface{}
  14. SP chan bool
  15. MgoSave mongodb.MongodbSim //数据保存库连接
  16. BulkSize int // 每批的数量
  17. TimeAfter int // 定时保存
  18. Timeout int // 缓存通道满时 超时丢弃
  19. }
  20. func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs {
  21. sl := SaveLogs{
  22. Name: name,
  23. CollName: saveColl,
  24. MgoSaveCacheSize: mgoSaveCacheSize,
  25. SPSize: sPSize,
  26. MgoSave: mgoSave,
  27. BulkSize: bulkSize,
  28. TimeAfter: timeAfter,
  29. Timeout: timeout,
  30. }
  31. // 初始化
  32. sl.SP = make(chan bool, sl.SPSize)
  33. sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize)
  34. return &sl
  35. }
  36. // SendLogs 往通道发送数据
  37. func (s *SaveLogs) SendLogs(data map[string]interface{}) {
  38. select {
  39. case s.MgoSaveCache <- data:
  40. case <-time.After(time.Duration(s.Timeout) * time.Millisecond):
  41. log.Println("缓存通道已满,丢弃:", data)
  42. return
  43. }
  44. }
  45. // SaveMgo 数据存库
  46. func (s *SaveLogs) SaveMgo() {
  47. log.Println(fmt.Sprintf("%s Save...", s.Name))
  48. arr := make([]map[string]interface{}, s.BulkSize)
  49. index := 0
  50. for {
  51. select {
  52. case v := <-s.MgoSaveCache:
  53. arr[index] = v
  54. index++
  55. if index == s.BulkSize {
  56. s.SP <- true
  57. go func(arru []map[string]interface{}) {
  58. defer func() {
  59. <-s.SP
  60. }()
  61. s.MgoSave.SaveBulk(s.CollName, arru...)
  62. }(arr)
  63. arr = make([]map[string]interface{}, s.BulkSize)
  64. index = 0
  65. }
  66. case <-time.After(time.Duration(s.TimeAfter) * time.Millisecond):
  67. if index > 0 {
  68. s.SP <- true
  69. go func(arru []map[string]interface{}) {
  70. defer func() {
  71. <-s.SP
  72. }()
  73. s.MgoSave.SaveBulk(s.CollName, arru...)
  74. }(arr[:index])
  75. arr = make([]map[string]interface{}, s.BulkSize)
  76. index = 0
  77. }
  78. }
  79. }
  80. }