package main import ( "log" "time" ) type updateFusionInfo struct { //更新或新增通道 updatePool chan []map[string]interface{} //数量 saveSize int } var sp_f = make(chan bool, 5) func newUpdateFusionPool() *updateFusionInfo { update:=&updateFusionInfo{make(chan []map[string]interface{}, 50000),100} return update } func (update *updateFusionInfo) updateFusionData() { log.Println("监听--融合更新数据") tmpArr := make([][]map[string]interface{}, update.saveSize) tmpIndex := 0 for { select { case value := <-update.updatePool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == update.saveSize { sp_f <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp_f }() mgo.UpSertBulk(fusion_coll_name, dataArr...) }(tmpArr) tmpArr = make([][]map[string]interface{}, update.saveSize) tmpIndex = 0 } case <-time.After(10 * time.Second)://无反应时每x秒检测一次 if tmpIndex > 0 { sp_f <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp_f }() mgo.UpSertBulk(fusion_coll_name, dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([][]map[string]interface{}, update.saveSize) tmpIndex = 0 } } } }