1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- package main
- import (
- "log"
- "time"
- )
- type updateFusionInfo struct {
- //更新或新增通道
- updatePool chan []map[string]interface{}
- //数量
- saveSize int
- }
- var sp_f = make(chan bool, 5)
- func newUpdateFusionPool() *updateFusionInfo {
- update:=&updateFusionInfo{make(chan []map[string]interface{}, 50000),100}
- return update
- }
- func (update *updateFusionInfo) updateFusionData() {
- log.Println("监听--融合更新数据")
- tmpArr := make([][]map[string]interface{}, update.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-update.updatePool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == update.saveSize {
- sp_f <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp_f
- }()
- mgo.UpSertBulk(fusion_coll_name, dataArr...)
- }(tmpArr)
- tmpArr = make([][]map[string]interface{}, update.saveSize)
- tmpIndex = 0
- }
- case <-time.After(10 * time.Second)://无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp_f <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp_f
- }()
- mgo.UpSertBulk(fusion_coll_name, dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([][]map[string]interface{}, update.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
|