12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- package entity
- import (
- "app.yhyue.com/moapp/jybase/mongodb"
- "fmt"
- "log"
- "time"
- )
- type SaveLogs struct {
- Name string // 日志名称
- CollName string // 保存的coll
- MgoSaveCacheSize int // 缓存通道大小
- SPSize int // 数据库并发数据
- MgoSaveCache chan map[string]interface{}
- SP chan bool
- MgoSave mongodb.MongodbSim //数据保存库连接
- BulkSize int // 每批的数量
- TimeAfter int // 定时保存
- Timeout int // 缓存通道满时 超时丢弃
- }
- func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs {
- sl := SaveLogs{
- Name: name,
- CollName: saveColl,
- MgoSaveCacheSize: mgoSaveCacheSize,
- SPSize: sPSize,
- MgoSave: mgoSave,
- BulkSize: bulkSize,
- TimeAfter: timeAfter,
- Timeout: timeout,
- }
- // 初始化
- sl.SP = make(chan bool, sl.SPSize)
- sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize)
- return &sl
- }
- // SendLogs 往通道发送数据
- func (s *SaveLogs) SendLogs(data map[string]interface{}) {
- select {
- case s.MgoSaveCache <- data:
- case <-time.After(time.Duration(s.Timeout) * time.Millisecond):
- log.Println("缓存通道已满,丢弃:", data)
- return
- }
- }
- // SaveMgo 数据存库
- func (s *SaveLogs) SaveMgo() {
- log.Println(fmt.Sprintf("%s Save...", s.Name))
- arr := make([]map[string]interface{}, s.BulkSize)
- index := 0
- for {
- select {
- case v := <-s.MgoSaveCache:
- arr[index] = v
- index++
- if index == s.BulkSize {
- s.SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-s.SP
- }()
- s.MgoSave.SaveBulk(s.CollName, arru...)
- }(arr)
- arr = make([]map[string]interface{}, s.BulkSize)
- index = 0
- }
- case <-time.After(time.Duration(s.TimeAfter) * time.Millisecond):
- if index > 0 {
- s.SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-s.SP
- }()
- s.MgoSave.SaveBulk(s.CollName, arru...)
- }(arr[:index])
- arr = make([]map[string]interface{}, s.BulkSize)
- index = 0
- }
- }
- }
- }
|