package main import ( "log" "time" ) type updateRecordInfo struct { //新增通道 add_pool chan map[string]interface{} //更新通道 update_pool chan []map[string]interface{} //数量 saveSize int } var sp_r = make(chan bool,10) //批量更新对象 func newUpdateRecordPool() *updateRecordInfo { update:=&updateRecordInfo{nil,make(chan []map[string]interface{}, 50000),200} return update } //批量新增对象 func newAddRecordPool() *updateRecordInfo { update:=&updateRecordInfo{make(chan map[string]interface{}, 50000),nil,200} return update } //新增池 func (update *updateRecordInfo) addRecordData() { log.Println("监听日志......新增数据") tmpArr := make([]map[string]interface{}, update.saveSize) tmpIndex := 0 for { select { case value := <-update.add_pool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == update.saveSize { sp_r <- true go func(dataArr []map[string]interface{}) { defer func() { <-sp_r }() //批量新增 mgo.SaveBulk(record_coll_name, dataArr...) }(tmpArr) tmpArr = make([]map[string]interface{}, update.saveSize) tmpIndex = 0 } case <-time.After(10 * time.Second)://无反应时每x秒检测一次 //log.Println("10秒检测",tmpIndex) if tmpIndex > 0 { sp_r <- true go func(dataArr []map[string]interface{}) { defer func() { <-sp_r }() //批量新增 mgo.SaveBulk(record_coll_name, dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([]map[string]interface{}, update.saveSize) tmpIndex = 0 } } } } //更新池 func (update *updateRecordInfo) updateRecordData() { log.Println("监听日志......更新数据") tmpArr := make([][]map[string]interface{}, update.saveSize) tmpIndex := 0 for { select { case value := <-update.update_pool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == update.saveSize { sp_r <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp_r }() //批量更新 mgo.UpSertBulk(record_coll_name, dataArr...) }(tmpArr) tmpArr = make([][]map[string]interface{}, update.saveSize) tmpIndex = 0 } case <-time.After(10 * time.Second)://无反应时每x秒检测一次 if tmpIndex > 0 { sp_r <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp_r }() //批量更新 mgo.UpSertBulk(record_coll_name, dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([][]map[string]interface{}, update.saveSize) tmpIndex = 0 } } } }