package main import ( "mongodb" es "qfw/util/elastic" "time" ) var ( MongoTool *mongodb.MongodbSim Es *es.Elastic updatePool chan []map[string]interface{} updateSp chan bool updatePool1 chan []map[string]interface{} updateSp1 chan bool saveSize int savePool chan map[string]interface{} saveSp chan bool savePool1 chan map[string]interface{} saveSp1 chan bool ) func init() { MongoTool = &mongodb.MongodbSim{ MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083", // 172.17.4.187:27082,172.17.145.163:27083 Size: 10, DbName: "mixdata", UserName: "SJZY_RWESBid_Other", Password: "SJZY@O17t8herB3B", } MongoTool.InitPool() Es = &es.Elastic{ S_esurl: "http://172.17.145.170:9800", //http://172.17.145.170:9800 I_size: 10, } Es.InitElasticSize() saveSize = 200 updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) updatePool1 = make(chan []map[string]interface{}, 5000) updateSp1 = make(chan bool, 5) savePool = make(chan map[string]interface{}, 5000) saveSp = make(chan bool, 5) savePool1 = make(chan map[string]interface{}, 5000) saveSp1 = make(chan bool, 5) } func main() { go saveMethod() go saveMethod1() go updateMethod() //go updateMethod1() //go findEs() //go fcResult() go TimeTask() ch := make(chan bool, 1) <-ch } func saveMethod() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePool: arru[indexu] = v indexu++ if indexu == saveSize { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MongoTool.SaveBulk("project_forecast_yece_tmp", arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MongoTool.SaveBulk("project_forecast_yece_tmp", arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func saveMethod1() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePool1: arru[indexu] = v indexu++ if indexu == saveSize { saveSp1 <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp1 }() MongoTool.SaveBulk("project_forecast", arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveSp1 <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp1 }() MongoTool.SaveBulk("project_forecast", arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func updateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpSertBulk("project_forecast_yece_tmp", arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpSertBulk("project_forecast_yece_tmp", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } } func updateMethod1() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool1: arru[indexu] = v indexu++ if indexu == saveSize { updateSp1 <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp1 }() MongoTool.UpSertBulk("project_forecast", arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp1 <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp1 }() MongoTool.UpSertBulk("project_forecast", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } }