package main import ( "log" "time" ) type updateRecordInfo struct { //更新或新增通道 updatePool chan []map[string]interface{} //数量 saveSize int } var sp_r = make(chan bool, 5) func newUpdateRecordPool() *updateRecordInfo { update:=&updateRecordInfo{make(chan []map[string]interface{}, 50000),500} return update } func (update *updateRecordInfo) updateRecordData() { log.Println("开始不断监听--待更新数据") tmpArr := make([][]map[string]interface{}, update.saveSize) tmpIndex := 0 for { select { case value := <-update.updatePool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == update.saveSize { sp_r <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp_r }() mgo.UpSertBulk(record_coll_name, dataArr...) }(tmpArr) tmpArr = make([][]map[string]interface{}, update.saveSize) tmpIndex = 0 } case <-time.After(10 * time.Second)://无反应时每x秒检测一次 if tmpIndex > 0 { sp_r <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp_r }() mgo.UpSertBulk(record_coll_name, dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([][]map[string]interface{}, update.saveSize) tmpIndex = 0 } } } }