package main import ( "time" ) //新增组 type addGroupInfo struct { pool chan map[string]interface{} saveSize int } //更新组 type updateGroupInfo struct { pool chan []map[string]interface{} saveSize int } //新增融合 type addFusionInfo struct { pool chan map[string]interface{} saveSize int } //更新融合 type updateFusionInfo struct { pool chan []map[string]interface{} saveSize int } //新增日志 type addRecordInfo struct { pool chan map[string]interface{} saveSize int } //更新日志 type updateRecordInfo struct { pool chan []map[string]interface{} saveSize int } var sp = make(chan bool,5) func newAddGroupPool() *addGroupInfo { info:=&addGroupInfo{make(chan map[string]interface{}, 50000),200} return info } func newUpdateGroupPool() *updateGroupInfo { info:=&updateGroupInfo{make(chan []map[string]interface{}, 50000),200} return info } func newAddFusionPool() *addFusionInfo { info:=&addFusionInfo{make(chan map[string]interface{}, 50000),200} return info } func newupdateFusionPool() *updateFusionInfo { info:=&updateFusionInfo{make(chan []map[string]interface{}, 50000),200} return info } func newaddRecordPool() *addRecordInfo { info:=&addRecordInfo{make(chan map[string]interface{}, 50000),200} return info } func newupdateRecordPool() *updateRecordInfo { info:=&updateRecordInfo{make(chan []map[string]interface{}, 50000),200} return info } //新增-组数据 func (info *addGroupInfo) addGroupData() { tmpArr := make([]map[string]interface{}, info.saveSize) tmpIndex := 0 for { select { case value := <-info.pool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == info.saveSize { sp <- true go func(dataArr []map[string]interface{}) { defer func() { <-sp }() mgo.SaveBulk(group_coll_name, dataArr...) }(tmpArr) tmpArr = make([]map[string]interface{}, info.saveSize) tmpIndex = 0 } case <-time.After(10 * time.Second)://无反应时每x秒检测一次 if tmpIndex > 0 { sp <- true go func(dataArr []map[string]interface{}) { defer func() { <-sp }() mgo.SaveBulk(group_coll_name, dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([]map[string]interface{}, info.saveSize) tmpIndex = 0 } } } } //更新-组数据 func (info *updateGroupInfo) updateGroupData() { tmpArr := make([][]map[string]interface{}, info.saveSize) tmpIndex := 0 for { select { case value := <-info.pool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == info.saveSize { sp <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp }() //批量更新 mgo.UpSertBulk(group_coll_name, dataArr...) }(tmpArr) tmpArr = make([][]map[string]interface{}, info.saveSize) tmpIndex = 0 } case <-time.After(10 * time.Second)://无反应时每x秒检测一次 if tmpIndex > 0 { sp <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp }() //批量更新 mgo.UpSertBulk(group_coll_name, dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([][]map[string]interface{}, info.saveSize) tmpIndex = 0 } } } } //新增融合数据 func (info *addFusionInfo) addFusionData() { tmpArr := make([]map[string]interface{}, info.saveSize) tmpIndex := 0 for { select { case value := <-info.pool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == info.saveSize { sp <- true go func(dataArr []map[string]interface{}) { defer func() { <-sp }() mgo.SaveBulk(fusion_coll_name, dataArr...) }(tmpArr) tmpArr = make([]map[string]interface{}, info.saveSize) tmpIndex = 0 } case <-time.After(10 * time.Second)://无反应时每x秒检测一次 if tmpIndex > 0 { sp <- true go func(dataArr []map[string]interface{}) { defer func() { <-sp }() mgo.SaveBulk(fusion_coll_name, dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([]map[string]interface{}, info.saveSize) tmpIndex = 0 } } } } //更新融合数据 func (info *updateFusionInfo) updateFusionData() { tmpArr := make([][]map[string]interface{}, info.saveSize) tmpIndex := 0 for { select { case value := <-info.pool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == info.saveSize { sp <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp }() //批量更新 mgo.UpSertBulk(fusion_coll_name, dataArr...) }(tmpArr) tmpArr = make([][]map[string]interface{}, info.saveSize) tmpIndex = 0 } case <-time.After(10 * time.Second)://无反应时每x秒检测一次 if tmpIndex > 0 { sp <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp }() //批量更新 mgo.UpSertBulk(fusion_coll_name, dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([][]map[string]interface{}, info.saveSize) tmpIndex = 0 } } } } //新增日志数据 func (info *addRecordInfo) addRecordData() { tmpArr := make([]map[string]interface{}, info.saveSize) tmpIndex := 0 for { select { case value := <-info.pool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == info.saveSize { sp <- true go func(dataArr []map[string]interface{}) { defer func() { <-sp }() mgo.SaveBulk(record_coll_name, dataArr...) }(tmpArr) tmpArr = make([]map[string]interface{}, info.saveSize) tmpIndex = 0 } case <-time.After(10 * time.Second)://无反应时每x秒检测一次 if tmpIndex > 0 { sp <- true go func(dataArr []map[string]interface{}) { defer func() { <-sp }() mgo.SaveBulk(record_coll_name, dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([]map[string]interface{}, info.saveSize) tmpIndex = 0 } } } } //更新日志数据 func (info *updateRecordInfo) updateRecordData() { tmpArr := make([][]map[string]interface{}, info.saveSize) tmpIndex := 0 for { select { case value := <-info.pool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == info.saveSize { sp <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp }() //批量更新 mgo.UpSertBulk(record_coll_name, dataArr...) }(tmpArr) tmpArr = make([][]map[string]interface{}, info.saveSize) tmpIndex = 0 } case <-time.After(10 * time.Second)://无反应时每x秒检测一次 if tmpIndex > 0 { sp <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp }() //批量更新 mgo.UpSertBulk(record_coll_name, dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([][]map[string]interface{}, info.saveSize) tmpIndex = 0 } } } }