package main import ( es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "time" ) var ( MgoB *mongodb.MongodbSim MgoQy *mongodb.MongodbSim MgoP *mongodb.MongodbSim Es *es.Elastic updatePool = make(chan []map[string]interface{}, 5000) updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 5) //保存协程 ) func InitMgo() { //MgoB = &mongodb.MongodbSim{ // MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080", // //MongodbAddr: "127.0.0.1:27083", // Size: 10, // DbName: "qfw", // UserName: "SJZY_RWbid_ES",a // Password: "SJZY@B4i4D5e6S", // //Direct: true, //} //MgoB.InitPool() MgoQy = &mongodb.MongodbSim{ MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "mixdata", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoQy.InitPool() MgoP = &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27080", MongodbAddr: "172.17.4.85:27080", DbName: "qfw", Size: 10, //Direct: true, } MgoP.InitPool() // 本地数据库 //MgoB = &mongodb.MongodbSim{ // //MongodbAddr: "172.17.189.140:27080", // MongodbAddr: "127.0.0.1:27017", // Size: 10, // DbName: "wcc", // //UserName: "SJZY_RWbid_ES", // //Password: "SJZY@B4i4D5e6S", // //Direct: true, //} //MgoB.InitPool() // 测试环境 //MgoB = &mongodb.MongodbSim{ // MongodbAddr: "192.168.3.206:27002", // //MongodbAddr: "127.0.0.1:27017", // Size: 10, // DbName: "qfw_data", // UserName: "root", // Password: "root", // //Direct: true, //} //MgoB.InitPool() } func InitEs() { Es = &es.Elastic{ //S_esurl: "http://127.0.0.1:19908", S_esurl: "http://172.17.4.184:19908", I_size: 5, Username: "jybid", Password: "Top2023_JEB01i@31", } Es.InitElasticSize() } func main() { InitMgo() InitEs() go updateEsMethod() fixQyxy() log.Println("11111111111") select {} } // updateEsMethod 更新es func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("projectset", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("projectset", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }