package main import ( "log" "time" ) var sp = make(chan bool, 5) type updateInfo struct { //更新或新增通道 updatePool chan []map[string]interface{} saveSize int } func newUpdatePool() *updateInfo { update := &updateInfo{make(chan []map[string]interface{}, 50000), 200} return update } // 临时~新增组 type addGroupInfo struct { pool chan map[string]interface{} saveSize int } func newAddGroupPool() *addGroupInfo { info := &addGroupInfo{make(chan map[string]interface{}, 50000), 200} return info } // 监听更新 func (update *updateInfo) updateData() { log.Println("开始不断监听--待更新数据") tmpArr := make([][]map[string]interface{}, update.saveSize) tmpIndex := 0 for { select { case value := <-update.updatePool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == update.saveSize { sp <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp }() data_mgo.UpSertBulk(extract, dataArr...) }(tmpArr) tmpArr = make([][]map[string]interface{}, update.saveSize) tmpIndex = 0 } case <-time.After(5 * time.Second): //无反应时每x秒检测一次 if tmpIndex > 0 { sp <- true go func(dataArr [][]map[string]interface{}) { defer func() { <-sp }() data_mgo.UpSertBulk(extract, dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([][]map[string]interface{}, update.saveSize) tmpIndex = 0 } } } } // 监听新增 func (info *addGroupInfo) addGroupData() { tmpArr := make([]map[string]interface{}, info.saveSize) tmpIndex := 0 for { select { case value := <-info.pool: tmpArr[tmpIndex] = value tmpIndex++ if tmpIndex == info.saveSize { sp <- true go func(dataArr []map[string]interface{}) { defer func() { <-sp }() data_mgo.SaveBulk("zktes_full_repeat", dataArr...) }(tmpArr) tmpArr = make([]map[string]interface{}, info.saveSize) tmpIndex = 0 } case <-time.After(7 * time.Second): //无反应时每x秒检测一次 if tmpIndex > 0 { sp <- true go func(dataArr []map[string]interface{}) { defer func() { <-sp }() data_mgo.SaveBulk("zktes_full_repeat", dataArr...) }(tmpArr[:tmpIndex]) tmpArr = make([]map[string]interface{}, info.saveSize) tmpIndex = 0 } } } }