1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- package main
- import (
- "log"
- "time"
- )
- type updateRecordInfo struct {
- //更新或新增通道
- updatePool chan []map[string]interface{}
- //数量
- saveSize int
- }
- var sp_r = make(chan bool, 5)
- func newUpdateRecordPool() *updateRecordInfo {
- update:=&updateRecordInfo{make(chan []map[string]interface{}, 50000),500}
- return update
- }
- func (update *updateRecordInfo) updateRecordData() {
- log.Println("开始不断监听--待更新数据")
- tmpArr := make([][]map[string]interface{}, update.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-update.updatePool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == update.saveSize {
- sp_r <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp_r
- }()
- mgo.UpSertBulk(record_coll_name, dataArr...)
- }(tmpArr)
- tmpArr = make([][]map[string]interface{}, update.saveSize)
- tmpIndex = 0
- }
- case <-time.After(10 * time.Second)://无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp_r <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp_r
- }()
- mgo.UpSertBulk(record_coll_name, dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([][]map[string]interface{}, update.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
|