package main import ( "fmt" "github.com/wcc4869/common_utils/log" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "time" ) var ( Mgo *mongodb.MongodbSim MgoT *mongodb.MongodbSim //测试环境链接 MgoR *mongodb.MongodbSim saveSize = 50 Es *elastic.Elastic EsNew *elastic.Elastic EsT *elastic.Elastic // 更新mongo updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) //更新es updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 5) //保存协程 ) func main() { //mongodb 163 //Mgo = &mongodb.MongodbSim{ // //MongodbAddr: "172.17.189.140:27080", // MongodbAddr: "127.0.0.1:27083", // DbName: "qfw", // Size: 10, // UserName: "SJZY_RWbid_ES", // Password: "SJZY@B4i4D5e6S", // Direct: true, //} //Mgo.InitPool() //85 //MgoR = &mongodb.MongodbSim{ // //MongodbAddr: "127.0.0.1:27080", // MongodbAddr: "172.17.4.85:27080", // DbName: "qfw", // Size: 10, // //Direct: true, //} //MgoR.InitPool() ////测试环境MongoDB MgoT = &mongodb.MongodbSim{ //MongodbAddr: "172.17.189.140:27080", MongodbAddr: "192.168.3.206:27002", DbName: "qfw_data", Size: 10, UserName: "root", Password: "root", //Direct: true, } MgoT.InitPool() // 测试环境es Es = &elastic.Elastic{ S_esurl: "http://192.168.3.149:9201", //S_esurl: "http://172.17.4.184:19805", I_size: 5, Username: "", Password: "", } Es.InitElasticSize() //es //Es = &elastic.Elastic{ // S_esurl: "http://127.0.0.1:19908", // //S_esurl: "http://172.17.4.184:19908", // I_size: 5, // Username: "jybid", // Password: "Top2023_JEB01i@31", //} //Es.InitElasticSize() // es 新集群 //EsNew = &elastic.Elastic{ // S_esurl: "http://127.0.0.1:19905", // //S_esurl: "http://172.17.4.184:19905", // I_size: 5, // Username: "jybid", // Password: "Top2023_JEB01i@31", //} //EsNew.InitElasticSize() //go updateMethod() //更新mongodb go updateEsMethod() //更新es //go updateProjectEsMethod() //taskRunProject() //taskRunBidding() dealBiddingTest() // 测试环境数据处理 //updateProject() fmt.Println("over") c := make(chan bool, 1) <-c } // taskRun 更新es 省市区三个字段 func taskRunBidding() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) //查询条件 //q := map[string]interface{}{ // //"_id": map[string]interface{}{ // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"), // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"), // // // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"), // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"), // //}, // //"comeintime": map[string]interface{}{ // // "$gt": 1669824000, // // //"$lte": 1669864950, // // "$lte": 1702265941, // //}, // //"site": "国家能源e购", // "toptype": map[string]interface{}{"$exists": 0}, //} //selected := map[string]interface{}{"contenthtml": 0, "detail": 0} it := sess.DB("qfw").C("zktest_bidding_0619_compare").Find(nil).Select(nil).Sort("_id").Iter() fmt.Println("taskRun 开始") count := 0 //realNum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } update := map[string]interface{}{} // 1.更新省市区 //if area, ok := tmp["area"]; ok && area != nil { // update["area"] = area //} else { // update["area"] = "" //} // //if city, ok := tmp["city"]; ok && city != nil { // update["city"] = city //} else { // update["city"] = "" //} // //if district, ok := tmp["district"]; ok && district != nil { // update["district"] = district //} else { // update["district"] = "" //} // 2.更新中标单位,中标金额 if winner, ok := tmp["winner"]; ok && winner != nil { update["winner"] = winner } else { update["winner"] = "" } if s_winner, ok := tmp["s_winner"]; ok && s_winner != nil { update["s_winner"] = s_winner } else { update["s_winner"] = "" } if bidamount, ok := tmp["bidamount"]; ok && bidamount != nil { update["bidamount"] = bidamount } else { update["bidamount"] = nil } //biddingID := util.ObjToString(tmp["id"]) biddingID := mongodb.BsonIdToSId(tmp["_id"]) if len(update) > 0 { //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update}) //2.es 项目 更新字段 err := Es.UpdateDocument("bidding", biddingID, update) err = EsNew.UpdateDocument("bidding", biddingID, update) if err != nil && err.Error() != "Document not updated: noop" { log.Info("bidding es update err", err, biddingID) } } } log.Info("Run Over...Count:", log.Int("count", count)) } // taskRunProject 更新项目表 省市区 func taskRunProject() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) // 项目数据 MgoP := &mongodb.MongodbSim{ MongodbAddr: "172.17.4.85:27080", //MongodbAddr: "127.0.0.1:27080", Size: 10, DbName: "qfw", //Direct: true, } MgoP.InitPool() selected := map[string]interface{}{"contenthtml": 0, "detail": 0} it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } biddingID := mongodb.BsonIdToSId(tmp["_id"]) where := map[string]interface{}{ "ids": biddingID, } // 找到对应项目数据 p, _ := MgoP.FindOne("projectset_20230904", where) projectId := mongodb.BsonIdToSId((*p)["_id"]) //1.更新MongoDB update := map[string]interface{}{} if area, ok := tmp["area"]; ok && area != nil { update["area"] = area } else { update["area"] = "" } if city, ok := tmp["city"]; ok && city != nil { update["city"] = city } else { update["city"] = "" } if district, ok := tmp["district"]; ok && district != nil { update["district"] = district } else { update["district"] = "" } if len(update) > 0 { MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update}) //2.es 项目 更新字段 err := Es.UpdateDocument("projectset", projectId, update) if err != nil { log.Info("es update err", err, projectId) } } //2.es 项目 更新字段 //if len(update) > 0 { // // 更新es // //updateEsPool <- []map[string]interface{}{ // // {"_id": projectId}, // // update, // //} //} } log.Info("Run Over...Count:", log.Int("count", count)) } // dealData 正式环境,同步合同期限 func dealData() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) //where := map[string]interface{}{ // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"), //} selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1} it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter() count := 0 realNum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } idStr := mongodb.BsonIdToSId(tmp["_id"]) update := make(map[string]interface{}) if tmp["signaturedate"] != nil { update["signaturedate"] = tmp["signaturedate"] } if tmp["contractperiod"] != nil { update["contractperiod"] = tmp["contractperiod"] } if tmp["expiredate"] != nil { update["expiredate"] = tmp["expiredate"] } if len(update) == 0 { continue } //bidding 表 if idStr > "5a862e7040d2d9bbe88e3b1f" { bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1}) data := *bidding Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update}) // 针对存量数据,重复数据不进索引 if util.IntAll(data["extracttype"]) == -1 { tmp = make(map[string]interface{}) continue } } else { //bidding_back bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1}) data := *bidding Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update}) // 针对存量数据,重复数据不进索引 if util.IntAll(data["extracttype"]) == -1 { tmp = make(map[string]interface{}) continue } } realNum++ //2.es 更新字段 esUpdate := update esUpdate["id"] = idStr if len(esUpdate) > 0 { // 更新es updateEsPool <- []map[string]interface{}{ {"_id": mongodb.BsonIdToSId(tmp["_id"])}, esUpdate, } } tmp = make(map[string]interface{}) } log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum)) } // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据 func dealResult() { defer util.Catch() sess := MgoR.GetMgoConn() defer MgoR.DestoryMongoConn(sess) where := map[string]interface{}{ "_id": map[string]interface{}{ "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"), "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"), }, "subtype": "合同", } selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1} it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } idStr := mongodb.BsonIdToSId(tmp["_id"]) update := make(map[string]interface{}) if tmp["signaturedate"] != nil { update["signaturedate"] = tmp["signaturedate"] } if tmp["contractperiod"] != nil { update["contractperiod"] = tmp["contractperiod"] } if tmp["expiredate"] != nil { update["expiredate"] = tmp["expiredate"] } if len(update) == 0 { continue } bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1}) data := *bidding Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update}) // 针对存量数据,重复数据不进索引 if util.IntAll(data["extracttype"]) == -1 { tmp = make(map[string]interface{}) continue } //2.es 更新字段 esUpdate := update esUpdate["id"] = idStr if len(esUpdate) > 0 { // 更新es updateEsPool <- []map[string]interface{}{ {"_id": mongodb.BsonIdToSId(tmp["_id"])}, esUpdate, } } tmp = make(map[string]interface{}) } log.Info("Run Over...Count:", log.Int("count", count)) } // dealBiddingTest 处理测试环境数据 func dealBiddingTest() { defer util.Catch() sess := MgoT.GetMgoConn() defer MgoT.DestoryMongoConn(sess) it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } update := map[string]interface{}{} // 2.更新中标单位,中标金额 if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil { update["tag_topinformation"] = tag_topinformation } if property_form, ok := tmp["property_form"]; ok && property_form != nil { update["property_form"] = property_form } biddingID := mongodb.BsonIdToSId(tmp["_id"]) if len(update) > 0 { //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update}) //2.es 项目 更新字段 //err := Es.UpdateDocument("bidding", biddingID, update) //if err != nil && err.Error() != "Document not updated: noop" { // log.Info("bidding es update err", err, biddingID) //} // 更新es updateEsPool <- []map[string]interface{}{ {"_id": biddingID}, update, } } } log.Info("Run Over...Count:", log.Int("count", count)) } // updateMethod 更新MongoDB func updateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } } // updateEsMethod 更新es func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) //EsNew.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) //EsNew.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } } // updateProjectEsMethod 更新项目索引 func updateProjectEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("projectset", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("projectset", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }