package main import ( "context" "fmt" "github.com/wcc4869/common_utils/log" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "time" ) var ( Mgo *mongodb.MongodbSim MgoT *mongodb.MongodbSim //测试环境链接 MgoR *mongodb.MongodbSim saveSize = 50 Es *elastic.Elastic EsNew *elastic.Elastic EsT *elastic.Elastic // 更新mongo updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) //更新es updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 5) //保存协程 ) func main() { //mongodb 163 //Mgo = &mongodb.MongodbSim{ // MongodbAddr: "172.17.189.140:27080", // //MongodbAddr: "127.0.0.1:27083", // DbName: "qfw", // Size: 10, // UserName: "SJZY_RWbid_ES", // Password: "SJZY@B4i4D5e6S", // //Direct: true, //} //Mgo.InitPool() //85 //MgoR = &mongodb.MongodbSim{ // //MongodbAddr: "127.0.0.1:27080", // MongodbAddr: "172.17.4.85:27080", // DbName: "qfw", // Size: 10, // //Direct: true, //} //MgoR.InitPool() ////测试环境MongoDB //MgoT = &mongodb.MongodbSim{ // //MongodbAddr: "172.17.189.140:27080", // MongodbAddr: "192.168.3.206:27002", // DbName: "qfw_data", // Size: 10, // UserName: "root", // Password: "root", // //Direct: true, //} //MgoT.InitPool() // 测试环境es //Es = &elastic.Elastic{ // S_esurl: "http://192.168.3.149:9201", // //S_esurl: "http://172.17.4.184:19805", // I_size: 5, // Username: "", // Password: "", //} //Es.InitElasticSize() //es Es = &elastic.Elastic{ //S_esurl: "http://127.0.0.1:19908", S_esurl: "http://172.17.4.184:19908", I_size: 5, Username: "jybid", Password: "Top2023_JEB01i@31", } Es.InitElasticSize() // es 新集群 //EsNew = &elastic.Elastic{ // //S_esurl: "http://127.0.0.1:19905", // S_esurl: "http://172.17.4.184:19905", // I_size: 5, // Username: "jybid", // Password: "Top2023_JEB01i@31", //} //EsNew.InitElasticSize() //go updateMethod() //更新mongodb //go updateEsMethod() //更新es go updateProjectEsMethod() //taskRunProject() //taskRunBidding() //dealDataTest()// 测试环境数据处理 updateProject() fmt.Println("over") c := make(chan bool, 1) <-c } // taskRun 更新es 省市区三个字段 func taskRunBidding() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) //pool := make(chan bool, 2) //处理协程 //wg := &sync.WaitGroup{} //查询条件 //q := map[string]interface{}{ // //"_id": map[string]interface{}{ // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"), // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"), // // // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"), // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"), // //}, // //"comeintime": map[string]interface{}{ // // "$gt": 1669824000, // // //"$lte": 1669864950, // // "$lte": 1702265941, // //}, // //"site": "国家能源e购", // "toptype": map[string]interface{}{"$exists": 0}, //} selected := map[string]interface{}{"contenthtml": 0, "detail": 0} it := sess.DB("qfw").C("zktest_0520_id").Find(nil).Select(selected).Sort("_id").Iter() fmt.Println("taskRun 开始") count := 0 //realNum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } //1.更新MongoDB update := map[string]interface{}{} //if area, ok := tmp["area"]; ok && area != nil { // update["area"] = area //} else { // update["area"] = "" //} // //if city, ok := tmp["city"]; ok && city != nil { // update["city"] = city //} else { // update["city"] = "" //} // //if district, ok := tmp["district"]; ok && district != nil { // update["district"] = district //} else { // update["district"] = "" //} //========// //toptype := "" //if toptype, ok := tmp["toptype"]; ok && toptype != nil { // update["toptype"] = toptype //} else { // update["toptype"] = "" //} //// //if subtype, ok := tmp["subtype"]; ok && subtype != nil { // if util.ObjToString(tmp["toptype"]) == "结果" && util.ObjToString(tmp["subtype"]) == "招标" { // update["subtype"] = "" // } //} else { // update["subtype"] = "" //} //update["toptype"] = "其它" //update["subtype"] = "其它" //if len(update) > 0 { // //更新MongoDB // updatePool <- []map[string]interface{}{ // //{"_id": tmp["id"]}, // {"_id": tmp["_id"]}, // {"$set": update}, // } // //====// // //biddingID := util.ObjToString(tmp["id"]) // //biddingID := mongodb.BsonIdToSId(tmp["_id"]) // //Mgo.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update}) // ////2.es 项目 更新字段 // //Es.UpdateDocument("bidding", biddingID, update) // //EsNew.UpdateDocument("bidding", biddingID, update) // //if err != nil { // // log.Info("bidding es update err", err, biddingID) // //} //} //2.es 更新字段 //esUpdate := make(map[string]interface{}) //if subtitle_projectname, ok := tmp["subtitle_projectname"]; ok && subtitle_projectname != nil { // esUpdate["subtitle_projectname"] = subtitle_projectname //} biddingID := util.ObjToString(tmp["id"]) bidamount := util.Float64All(tmp["bidamount"]) _, ok := tmp["bidamount"] if ok && bidamount > 0 { if biddingID != "" { update["bidamount"] = bidamount } } if len(update) > 0 { // 更新es updateEsPool <- []map[string]interface{}{ {"_id": biddingID}, //{"_id": mongodb.BsonIdToSId(tmp["_id"])}, update, } } //if len(update) > 0 { // id := mongodb.BsonIdToSId(tmp["_id"]) // //id := mongodb.BsonIdToSId(tmp["_id"]) // err := Es.UpdateDocument("projectset", id, esUpdate) // if err != nil { // if strings.Contains(err.Error(), "Document not updated:") { // continue // } else { // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"])) // } // } //} //if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 // tmp = make(map[string]interface{}) // continue //} //// 针对存量数据,重复数据不进索引 //if util.IntAll(tmp["extracttype"]) == -1 { // continue //} // ////针对产权数据,暂时不入es 索引库 //if util.IntAll(tmp["infoformat"]) == 3 { // continue //} //只有 紧急直接零星采购公告 栏目的数据,需要改成 结果-成交 //channel := util.ObjToString(tmp["channel"]) //if channel != "紧急直接零星采购公告" { // continue //} //realNum++ //pool <- true //wg.Add(1) //go func(tmp map[string]interface{}) { // defer func() { // <-pool // wg.Done() // }() ////2.es 更新字段 //esUpdate := make(map[string]interface{}) //if autoid, ok := tmp["autoid"]; ok && autoid != nil { // esUpdate["autoid"] = autoid //} // //if len(esUpdate) > 0 { // err := Es.UpdateDocument("bidding", mongodb.BsonIdToSId(tmp["_id"]), esUpdate) // if err != nil { // if strings.Contains(err.Error(), "Document not updated:") { // return // } else { // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"])) // } // } //} //if err != nil { // fmt.Println(err, mongodb.BsonIdToSId(tmp["_id"])) //} //if tag_set, ok := tmp["tag_set"]; ok && tag_set != nil { // esUpdate["tag_set"] = tag_set //} // //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil { // esUpdate["tag_topinformation"] = tag_topinformation //} // //if tag_subinformation, ok := tmp["tag_subinformation"]; ok && tag_subinformation != nil { // esUpdate["tag_subinformation"] = tag_subinformation //} //if len(esUpdate) > 0 { // // 更新es // updateEsPool <- []map[string]interface{}{ // {"_id": mongodb.BsonIdToSId(tmp["_id"])}, // esUpdate, // } //} //}(tmp) //tmp = make(map[string]interface{}) } //wg.Wait() log.Info("Run Over...Count:", log.Int("count", count)) } // taskRunProject 更新项目表 省市区 func taskRunProject() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) // 项目数据 MgoP := &mongodb.MongodbSim{ MongodbAddr: "172.17.4.85:27080", //MongodbAddr: "127.0.0.1:27080", Size: 10, DbName: "qfw", //Direct: true, } MgoP.InitPool() selected := map[string]interface{}{"contenthtml": 0, "detail": 0} it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } biddingID := mongodb.BsonIdToSId(tmp["_id"]) where := map[string]interface{}{ "ids": biddingID, } // 找到对应项目数据 p, _ := MgoP.FindOne("projectset_20230904", where) projectId := mongodb.BsonIdToSId((*p)["_id"]) //1.更新MongoDB update := map[string]interface{}{} if area, ok := tmp["area"]; ok && area != nil { update["area"] = area } else { update["area"] = "" } if city, ok := tmp["city"]; ok && city != nil { update["city"] = city } else { update["city"] = "" } if district, ok := tmp["district"]; ok && district != nil { update["district"] = district } else { update["district"] = "" } if len(update) > 0 { MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update}) //2.es 项目 更新字段 err := Es.UpdateDocument("projectset", projectId, update) if err != nil { log.Info("es update err", err, projectId) } } //2.es 项目 更新字段 //if len(update) > 0 { // // 更新es // //updateEsPool <- []map[string]interface{}{ // // {"_id": projectId}, // // update, // //} //} } log.Info("Run Over...Count:", log.Int("count", count)) } // dealData 正式环境,同步合同期限 func dealData() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) //where := map[string]interface{}{ // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"), //} selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1} it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter() count := 0 realNum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } idStr := mongodb.BsonIdToSId(tmp["_id"]) update := make(map[string]interface{}) if tmp["signaturedate"] != nil { update["signaturedate"] = tmp["signaturedate"] } if tmp["contractperiod"] != nil { update["contractperiod"] = tmp["contractperiod"] } if tmp["expiredate"] != nil { update["expiredate"] = tmp["expiredate"] } if len(update) == 0 { continue } //bidding 表 if idStr > "5a862e7040d2d9bbe88e3b1f" { bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1}) data := *bidding Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update}) // 针对存量数据,重复数据不进索引 if util.IntAll(data["extracttype"]) == -1 { tmp = make(map[string]interface{}) continue } } else { //bidding_back bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1}) data := *bidding Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update}) // 针对存量数据,重复数据不进索引 if util.IntAll(data["extracttype"]) == -1 { tmp = make(map[string]interface{}) continue } } realNum++ //2.es 更新字段 esUpdate := update esUpdate["id"] = idStr if len(esUpdate) > 0 { // 更新es updateEsPool <- []map[string]interface{}{ {"_id": mongodb.BsonIdToSId(tmp["_id"])}, esUpdate, } } tmp = make(map[string]interface{}) } log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum)) } // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据 func dealResult() { defer util.Catch() sess := MgoR.GetMgoConn() defer MgoR.DestoryMongoConn(sess) where := map[string]interface{}{ "_id": map[string]interface{}{ "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"), "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"), }, "subtype": "合同", } selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1} it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } idStr := mongodb.BsonIdToSId(tmp["_id"]) update := make(map[string]interface{}) if tmp["signaturedate"] != nil { update["signaturedate"] = tmp["signaturedate"] } if tmp["contractperiod"] != nil { update["contractperiod"] = tmp["contractperiod"] } if tmp["expiredate"] != nil { update["expiredate"] = tmp["expiredate"] } if len(update) == 0 { continue } bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1}) data := *bidding Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update}) // 针对存量数据,重复数据不进索引 if util.IntAll(data["extracttype"]) == -1 { tmp = make(map[string]interface{}) continue } //2.es 更新字段 esUpdate := update esUpdate["id"] = idStr if len(esUpdate) > 0 { // 更新es updateEsPool <- []map[string]interface{}{ {"_id": mongodb.BsonIdToSId(tmp["_id"])}, esUpdate, } } tmp = make(map[string]interface{}) } log.Info("Run Over...Count:", log.Int("count", count)) } // dealDataTest 处理测试环境数据 func dealDataTest() { defer util.Catch() sess := MgoT.GetMgoConn() defer MgoT.DestoryMongoConn(sess) where := map[string]interface{}{ "_id": map[string]interface{}{ "$gte": mongodb.StringTOBsonId("635051528aea8786d196e24a"), "$lte": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"), }, } //where := map[string]interface{}{ // "_id": mongodb.StringTOBsonId("639356ca8aea8786d1995c4b"), //} ctx := context.Background() coll := sess.M.C.Database("qfw_data").Collection("bidding") find := options.Find().SetBatchSize(1000).SetSort(bson.D{bson.E{"_id", -1}}).SetProjection(bson.M{"_id": 1, "title": 1, "subtype": 1}) cur, err := coll.Find(ctx, where, find) if err != nil { fmt.Println(err) } /////// selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1} //it := sess.DB("qfw_data").C("bidding").Find(where).Select(nil).Iter() count := 0 realNum := 0 for tmp := make(map[string]interface{}); cur.Next(ctx); count++ { if cur != nil { cur.Decode(&tmp) } if count%1000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } idStr := mongodb.BsonIdToSId(tmp["_id"]) data, _ := Mgo.FindById("zktest_quanliang_0210_fbs", idStr, selected) if len(*data) == 0 { continue } update := make(map[string]interface{}) if (*data)["signaturedate"] != nil { update["signaturedate"] = (*data)["signaturedate"] } if (*data)["contractperiod"] != nil { update["contractperiod"] = (*data)["contractperiod"] } if (*data)["expiredate"] != nil { update["expiredate"] = (*data)["expiredate"] } if len(update) == 0 { continue } fmt.Println(idStr) MgoT.UpdateById("bidding", idStr, map[string]interface{}{"$set": update}) //bidding 表 //if idStr > "5a862e7040d2d9bbe88e3b1f" { // bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1}) // data := *bidding // Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update}) // // // 针对存量数据,重复数据不进索引 // if util.IntAll(data["extracttype"]) == -1 { // tmp = make(map[string]interface{}) // continue // } // //} else { // //bidding_back // bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1}) // data := *bidding // Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update}) // // 针对存量数据,重复数据不进索引 // if util.IntAll(data["extracttype"]) == -1 { // tmp = make(map[string]interface{}) // continue // } //} realNum++ //2.es 更新字段 esUpdate := update esUpdate["id"] = idStr if len(esUpdate) > 0 { fmt.Println(idStr) // 更新es updateEsPool <- []map[string]interface{}{ {"_id": mongodb.BsonIdToSId(tmp["_id"])}, esUpdate, } } //err := Es.UpdateDocument("bidding", idStr, update) //if err != nil { // log.Error("es update", err) //} // //err = EsNew.UpdateDocument("bidding", idStr, update) //if err != nil { // log.Error("esNew update", err) //} tmp = make(map[string]interface{}) } log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum)) } // updateMethod 更新MongoDB func updateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } } // updateEsMethod 更新es func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) EsNew.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) EsNew.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } } // updateProjectEsMethod 更新项目索引 func updateProjectEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("projectset", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("projectset", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }