package main import ( "fmt" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "time" ) var ( //更新es updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 3) //保存协程 Es *es.Elastic esIndex = "qyxy" Mgo *mongodb.MongodbSim MgoQY *mongodb.MongodbSim ) func initEs() { Es = &es.Elastic{ //S_esurl: "http://127.0.0.1:19908", S_esurl: "http://172.17.4.184:19908", I_size: 10, Username: "jybid", Password: "Top2023_JEB01i@31", } Es.InitElasticSize() } // initMgo initMgo func initMgo() { //181 凭安库 MgoQY = &mongodb.MongodbSim{ MongodbAddr: "172.17.4.181:27001", //MongodbAddr: "127.0.0.1:27001", DbName: "mixdata", Size: 10, UserName: "", Password: "", //Direct: true, } MgoQY.InitPool() //qyxy_std Mgo = &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "mixdata", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() } func main() { go updateEsMethod() initEs() initMgo() dealCapitalData() select {} } // dealCapitalData 处理数据 注册资金 func dealCapitalData() { defer util.Catch() sess := MgoQY.GetMgoConn() defer MgoQY.DestoryMongoConn(sess) it := sess.DB("mixdata").C("company_base").Find(nil).Select(nil).Sort("_id").Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Println("current:", count, tmp["company_name"]) } where := map[string]interface{}{ "company_name": tmp["company_name"], } update := make(map[string]interface{}) std, _ := Mgo.FindOne("qyxy_std", where) companyType := util.ObjToString(tmp["company_type"]) if companyType == "事业单位" { special, _ := MgoQY.FindOne("special_enterprise", where) //1.事业单位数据,注册资金错误 if util.ObjToString((*special)["capital"]) != "" { text := util.ObjToString((*special)["capital"]) capital := ObjToMoney(text) capital = capital / 10000 update["capital"] = capital } } else { //2.企业的capital =nil,需要更新为0 if util.ObjToString(tmp["capital"]) == "" { update["capital"] = float64(0) } } if _, ok := (*std)["capital"]; ok { if len(update) > 0 { //Mgo.Update(GF.Mongo.Coll, where, map[string]interface{}{"$set": update}, true, false) Mgo.Update("qyxy_std", where, map[string]interface{}{"$set": update}, true, false) ////更新es //err := Es.UpdateDocument(esIndex, util.ObjToString(tmp["company_id"]), update) //if err != nil { // log.Println("err", err, update, where) //} updateEsPool <- []map[string]interface{}{ {"_id": util.ObjToString(tmp["company_id"])}, update, } } } } log.Println("数据迭代结束") } // updateEsMethod 更新es func updateEsMethod() { arru := make([][]map[string]interface{}, 200) //200条一组更新es indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk(esIndex, arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk(esIndex, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }