1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- package main
- import (
- "log"
- "time"
- )
- type updateInfo struct {
- //更新或新增通道
- updatePool chan []map[string]interface{}
- //数量
- saveSize int
- }
- var sp = make(chan bool, 5)
- func newUpdatePool() *updateInfo {
- update:=&updateInfo{make(chan []map[string]interface{}, 50000),500}
- return update
- }
- func (update *updateInfo) updateData() {
- log.Println("开始不断监听--待更新数据")
- tmpArr := make([][]map[string]interface{}, update.saveSize)
- tmpIndex := 0
- for {
- select {
- case value := <-update.updatePool:
- tmpArr[tmpIndex] = value
- tmpIndex++
- if tmpIndex == update.saveSize {
- sp <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- mgo.UpSertBulk(extract, dataArr...)
- }(tmpArr)
- tmpArr = make([][]map[string]interface{}, update.saveSize)
- tmpIndex = 0
- }
- case <-time.After(10 * time.Second)://无反应时每x秒检测一次
- if tmpIndex > 0 {
- sp <- true
- go func(dataArr [][]map[string]interface{}) {
- defer func() {
- <-sp
- }()
- mgo.UpSertBulk(extract, dataArr...)
- }(tmpArr[:tmpIndex])
- tmpArr = make([][]map[string]interface{}, update.saveSize)
- tmpIndex = 0
- }
- }
- }
- }
|