|
@@ -0,0 +1,86 @@
|
|
|
|
+package entity
|
|
|
|
+
|
|
|
|
+import (
|
|
|
|
+ "app.yhyue.com/moapp/jybase/mongodb"
|
|
|
|
+ "fmt"
|
|
|
|
+ "log"
|
|
|
|
+ "time"
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+type SaveLogs struct {
|
|
|
|
+ Name string // 日志名称
|
|
|
|
+ CollName string // 保存的coll
|
|
|
|
+ MgoSaveCacheSize int // 缓存通道大小
|
|
|
|
+ SPSize int // 数据库并发数据
|
|
|
|
+ MgoSaveCache chan map[string]interface{}
|
|
|
|
+ SP chan bool
|
|
|
|
+ MgoSave mongodb.MongodbSim //数据保存库连接
|
|
|
|
+ BulkSize int // 每批的数量
|
|
|
|
+ TimeAfter int // 定时保存
|
|
|
|
+ Timeout int // 缓存通道满时 超时丢弃
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs {
|
|
|
|
+ sl := SaveLogs{
|
|
|
|
+ Name: name,
|
|
|
|
+ CollName: saveColl,
|
|
|
|
+ MgoSaveCacheSize: mgoSaveCacheSize,
|
|
|
|
+ SPSize: sPSize,
|
|
|
|
+ MgoSave: mgoSave,
|
|
|
|
+ BulkSize: bulkSize,
|
|
|
|
+ TimeAfter: timeAfter,
|
|
|
|
+ Timeout: timeout,
|
|
|
|
+ }
|
|
|
|
+ // 初始化
|
|
|
|
+ sl.SP = make(chan bool, sl.SPSize)
|
|
|
|
+ sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize)
|
|
|
|
+ return &sl
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// SendLogs 往通道发送数据
|
|
|
|
+func (s *SaveLogs) SendLogs(data map[string]interface{}) {
|
|
|
|
+ timer := time.NewTicker(time.Duration(s.Timeout) * time.Millisecond)
|
|
|
|
+ select {
|
|
|
|
+ case s.MgoSaveCache <- data:
|
|
|
|
+ case <-timer.C:
|
|
|
|
+ log.Println("缓存通道已满,丢弃:", data)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// SaveMgo 数据存库
|
|
|
|
+func (s *SaveLogs) SaveMgo() {
|
|
|
|
+ log.Println(fmt.Sprintf("%s Save...", s.Name))
|
|
|
|
+ arr := make([]map[string]interface{}, s.BulkSize)
|
|
|
|
+ index := 0
|
|
|
|
+ for {
|
|
|
|
+ select {
|
|
|
|
+ case v := <-s.MgoSaveCache:
|
|
|
|
+ arr[index] = v
|
|
|
|
+ index++
|
|
|
|
+ if index == s.BulkSize {
|
|
|
|
+ s.SP <- true
|
|
|
|
+ go func(arru []map[string]interface{}) {
|
|
|
|
+ defer func() {
|
|
|
|
+ <-s.SP
|
|
|
|
+ }()
|
|
|
|
+ s.MgoSave.SaveBulk(s.CollName, arru...)
|
|
|
|
+ }(arr)
|
|
|
|
+ arr = make([]map[string]interface{}, s.BulkSize)
|
|
|
|
+ index = 0
|
|
|
|
+ }
|
|
|
|
+ case <-time.After(time.Duration(s.TimeAfter) * time.Millisecond):
|
|
|
|
+ if index > 0 {
|
|
|
|
+ s.SP <- true
|
|
|
|
+ go func(arru []map[string]interface{}) {
|
|
|
|
+ defer func() {
|
|
|
|
+ <-s.SP
|
|
|
|
+ }()
|
|
|
|
+ s.MgoSave.SaveBulk(s.CollName, arru...)
|
|
|
|
+ }(arr[:index])
|
|
|
|
+ arr = make([]map[string]interface{}, s.BulkSize)
|
|
|
|
+ index = 0
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|