123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- package main
- import (
- "log"
- "time"
- )
- type updateRecordInfo struct {
- //新增通道
- add_pool chan map[string]interface{}
- //更新通道
- update_pool chan []map[string]interface{}
- //数量
- saveSize int
- }
- var sp_r = make(chan bool,10)
- //批量更新对象
- func newUpdateRecordPool() *updateRecordInfo {
- update:=&updateRecordInfo{nil,make(chan []map[string]interface{}, 50000),200}
- return update
- }
- //批量新增对象
- func newAddRecordPool() *updateRecordInfo {
- update:=&updateRecordInfo{make(chan map[string]interface{}, 50000),nil,200}
- return update
- }
- //新增池
- func (update *updateRecordInfo) addRecordData() {
- log.Println("监听日志......新增数据")
- tmpArr := make([]map[string]interface{}, update.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-update.add_pool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == update.saveSize {
- sp_r <- true
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-sp_r
- }()
- //批量新增
- mgo.SaveBulk(record_coll_name, dataArr...)
- }(tmpArr)
- tmpArr = make([]map[string]interface{}, update.saveSize)
- tmpIndex = 0
- }
- case <-time.After(10 * time.Second)://无反应时每x秒检测一次
- //log.Println("10秒检测",tmpIndex)
- if tmpIndex > 0 {
- sp_r <- true
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-sp_r
- }()
- //批量新增
- mgo.SaveBulk(record_coll_name, dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([]map[string]interface{}, update.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
- //更新池
- func (update *updateRecordInfo) updateRecordData() {
- log.Println("监听日志......更新数据")
- tmpArr := make([][]map[string]interface{}, update.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-update.update_pool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == update.saveSize {
- sp_r <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp_r
- }()
- //批量更新
- mgo.UpSertBulk(record_coll_name, dataArr...)
- }(tmpArr)
- tmpArr = make([][]map[string]interface{}, update.saveSize)
- tmpIndex = 0
- }
- case <-time.After(10 * time.Second)://无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp_r <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp_r
- }()
- //批量更新
- mgo.UpSertBulk(record_coll_name, dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([][]map[string]interface{}, update.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
|