package entity import ( "app.yhyue.com/moapp/jybase/mongodb" "fmt" "log" "time" ) type SaveLogs struct { Name string // 日志名称 CollName string // 保存的coll MgoSaveCacheSize int // 缓存通道大小 SPSize int // 数据库并发数据 MgoSaveCache chan map[string]interface{} SP chan bool MgoSave mongodb.MongodbSim //数据保存库连接 BulkSize int // 每批的数量 TimeAfter int // 定时保存 Timeout int // 缓存通道满时 超时丢弃 } func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs { sl := SaveLogs{ Name: name, CollName: saveColl, MgoSaveCacheSize: mgoSaveCacheSize, SPSize: sPSize, MgoSave: mgoSave, BulkSize: bulkSize, TimeAfter: timeAfter, Timeout: timeout, } // 初始化 sl.SP = make(chan bool, sl.SPSize) sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize) return &sl } // SendLogs 往通道发送数据 func (s *SaveLogs) SendLogs(data map[string]interface{}) { timer := time.NewTicker(time.Duration(s.Timeout) * time.Millisecond) defer timer.Stop() select { case s.MgoSaveCache <- data: case <-timer.C: log.Println("缓存通道已满,丢弃:", data) return } } // SaveMgo 数据存库 func (s *SaveLogs) SaveMgo() { log.Println(fmt.Sprintf("%s Save...", s.Name)) arr := make([]map[string]interface{}, s.BulkSize) index := 0 timer := time.NewTicker(time.Duration(s.TimeAfter) * time.Millisecond) defer timer.Stop() for { select { case v := <-s.MgoSaveCache: arr[index] = v index++ if index == s.BulkSize { s.SP <- true go func(arru []map[string]interface{}) { defer func() { <-s.SP }() s.MgoSave.SaveBulk(s.CollName, arru...) }(arr) arr = make([]map[string]interface{}, s.BulkSize) index = 0 } case <-timer.C: if index > 0 { s.SP <- true go func(arru []map[string]interface{}) { defer func() { <-s.SP }() s.MgoSave.SaveBulk(s.CollName, arru...) }(arr[:index]) arr = make([]map[string]interface{}, s.BulkSize) index = 0 } } } }