package src5 import ( "log" "time" ) type updateInfo struct { //更新或新增通道 updatePool chan []map[string]interface{} //数量 saveSize int } var sp = make(chan bool, 5) func newUpdatePool() *updateInfo { update:=&updateInfo{make(chan []map[string]interface{}, 50000),200} return update } func (update *updateInfo) updateData() { log.Println("开始不断监听--待更新数据") tmpArr := make([][]map[string]interface{}, update.saveSize) tmpIndex := 0 for { select { case value := <-update.updatePool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == update.saveSize { sp <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp }() mgo.UpSertBulk(extract, dataArr...) }(tmpArr) tmpArr = make([][]map[string]interface{}, update.saveSize) tmpIndex = 0 } case <-time.After(5 * time.Second)://无反应时每x秒检测一次 if tmpIndex > 0 { sp <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp }() mgo.UpSertBulk(extract, dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([][]map[string]interface{}, update.saveSize) tmpIndex = 0 } } } }