package main import ( "fmt" "go.mongodb.org/mongo-driver/bson" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "strings" "time" ) var ( //更新es updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 3) //保存协程 // 更新mongo updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) saveSize = 50 //更新es Es *es.Elastic esIndex = "qyxy" Mgo *mongodb.MongodbSim MgoQY *mongodb.MongodbSim ) func initEs() { Es = &es.Elastic{ //S_esurl: "http://127.0.0.1:19908", S_esurl: "http://172.17.4.184:19908", I_size: 10, Username: "jybid", Password: "Top2023_JEB01i@31", } Es.InitElasticSize() } // initMgo initMgo func initMgo() { //181 凭安库 //MgoQY = &mongodb.MongodbSim{ // MongodbAddr: "172.17.4.181:27001", // //MongodbAddr: "127.0.0.1:27001", // DbName: "mixdata", // Size: 10, // UserName: "", // Password: "", // //Direct: true, //} //MgoQY.InitPool() //qyxy_std Mgo = &mongodb.MongodbSim{ MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "mixdata", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() } func main() { go updateEsMethod() go updateMethod() initEs() initMgo() readXlsx() //dealCapitalData() //dealCompanyTypeInt() dealCompanyType() select {} } // dealCapitalData 处理数据 注册资金 func dealCapitalData() { defer util.Catch() sess := MgoQY.GetMgoConn() defer MgoQY.DestoryMongoConn(sess) it := sess.DB("mixdata").C("company_base").Find(nil).Select(nil).Sort("_id").Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Println("current:", count, tmp["company_name"]) } where := map[string]interface{}{ "company_name": tmp["company_name"], } update := make(map[string]interface{}) std, _ := Mgo.FindOne("qyxy_std", where) companyType := util.ObjToString(tmp["company_type"]) if companyType == "事业单位" { special, _ := MgoQY.FindOne("special_enterprise", where) //1.事业单位数据,注册资金错误 if util.ObjToString((*special)["capital"]) != "" { text := util.ObjToString((*special)["capital"]) capital := ObjToMoney(text) capital = capital / 10000 update["capital"] = capital } } else { //2.企业的capital =nil,需要更新为0 if util.ObjToString(tmp["capital"]) == "" { update["capital"] = float64(0) } } if _, ok := (*std)["capital"]; ok { if len(update) > 0 { //Mgo.Update(GF.Mongo.Coll, where, map[string]interface{}{"$set": update}, true, false) Mgo.Update("qyxy_std", where, map[string]interface{}{"$set": update}, true, false) ////更新es //err := Es.UpdateDocument(esIndex, util.ObjToString(tmp["company_id"]), update) //if err != nil { // log.Println("err", err, update, where) //} updateEsPool <- []map[string]interface{}{ {"_id": util.ObjToString(tmp["company_id"])}, update, } } } } log.Println("数据迭代结束") } // dealCompanyTypeInt 修复company_type_int func dealCompanyTypeInt() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) // 构建查询条件 where := bson.M{ "$or": []bson.M{ {"company_type": "其他"}, // company_type 等于 "其他" {"company_type": bson.M{"$exists": false}}, // company_type 不存在 }, } it := sess.DB("mixdata").C("qyxy_std").Find(where).Select(nil).Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Println("current:", count, tmp["company_name"], tmp["_id"]) } company_name := util.ObjToString(tmp["company_name"]) if strings.HasSuffix(company_name, "公司") || strings.HasSuffix(company_name, "集团") { update := make(map[string]interface{}) update["company_type_int"] = 12 updateEsPool <- []map[string]interface{}{ {"_id": util.ObjToString(tmp["_id"])}, update, } } } log.Println("数据处理完毕,总数是", count) } func dealCompanyType() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) it := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(nil).Iter() fmt.Println("dealCompanyType 开始处理 企业数据") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Println("current:", count, tmp["company_name"], tmp["_id"]) } company_name := util.ObjToString(tmp["company_name"]) company_type := util.ObjToString(tmp["company_type"]) if company_name == "" || company_type == "" { continue } ss := getCompanyType(company_name, company_type) if ss != "" { update := make(map[string]interface{}) update["ownership_type"] = ss updateEsPool <- []map[string]interface{}{ {"_id": util.ObjToString(tmp["_id"])}, update, } //更新MongoDB updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": update}, } } } log.Println("企业数据 处理完毕,总数是", count) } // updateEsMethod 更新es func updateEsMethod() { arru := make([][]map[string]interface{}, 200) //200条一组更新es indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk(esIndex, arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk(esIndex, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } } // updateMethod 更新MongoDB func updateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk("qyxy_std", arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk("qyxy_std", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } }