package main import ( "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "time" ) var ( MongoTool *mongodb.MongodbSim Es *elastic.Elastic updatePool chan []map[string]interface{} updateSp chan bool updatePool1 chan []map[string]interface{} updateSp1 chan bool saveSize int savePool chan map[string]interface{} saveSp chan bool ) func init() { MongoTool = &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081", Size: 10, DbName: "mixdata", UserName: "SJZY_RWESBid_Other", Password: "SJZY@O17t8herB3B", } MongoTool.InitPool() Es = &elastic.Elastic{ S_esurl: "http://172.17.162.27:19908", //http://172.17.4.184:19800 I_size: 10, Username: "dataGr_appli", Password: "L2ds90Ha4e5#", } Es.InitElasticSize() saveSize = 200 savePool = make(chan map[string]interface{}, 5000) saveSp = make(chan bool, 5) } func main() { go saveMethod() go updateMethod() //go findEs() go TimeTask() ch := make(chan bool, 1) <-ch } func saveMethod() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePool: arru[indexu] = v indexu++ if indexu == saveSize { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MongoTool.SaveBulk("project_forecast", arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MongoTool.SaveBulk("project_forecast", arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func updateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool1: arru[indexu] = v indexu++ if indexu == saveSize { updateSp1 <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp1 }() MongoTool.UpSertBulk("project_forecast", arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp1 <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp1 }() MongoTool.UpSertBulk("project_forecast", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } }