package main import ( "context" "encoding/json" "fmt" es7 "github.com/olivere/elastic/v7" "github.com/wcc4869/common_utils/log" "go.uber.org/zap" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "reflect" "strings" "time" ) var ( Mgo *mongodb.MongodbSim MgoB *mongodb.MongodbSim MgoBAi *mongodb.MongodbSim MgoT *mongodb.MongodbSim //测试环境链接 MgoR *mongodb.MongodbSim saveSize = 50 Es *elastic.Elastic // 19908 EsNew *elastic.Elastic //19905 //EsT *elastic.Elastic // 更新mongo updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) //更新es updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 5) //保存协程 updateProjectEsPool = make(chan []map[string]interface{}, 5000) updateProjectEsSp = make(chan bool, 5) //保存协程 BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段, BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段 ) func Init() { //MgoB = &mongodb.MongodbSim{ // MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080", // //MongodbAddr: "127.0.0.1:27083", // DbName: "qfw", // Size: 10, // UserName: "SJZY_RWbid_ES", // Password: "SJZY@B4i4D5e6S", // //Direct: true, //} //MgoB.InitPool() //MgoBAi = &mongodb.MongodbSim{ // //MongodbAddr: "172.17.189.140:27080", // MongodbAddr: "127.0.0.1:27083", // DbName: "qfw_ai", // Size: 10, // UserName: "SJZY_RWbid_ES", // Password: "SJZY@B4i4D5e6S", // Direct: true, //} //MgoBAi.InitPool() //mongodb 163 //Mgo = &mongodb.MongodbSim{ // //MongodbAddr: "172.17.189.140:27080", // MongodbAddr: "127.0.0.1:27083", // DbName: "qfw", // Size: 10, // UserName: "SJZY_RWbid_ES", // Password: "SJZY@B4i4D5e6S", // Direct: true, //} //Mgo.InitPool() //85 MgoR = &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27080", MongodbAddr: "172.17.4.85:27080", DbName: "qfw", Size: 10, //Direct: true, } MgoR.InitPool() //测试环境MongoDB //MgoT = &mongodb.MongodbSim{ // //MongodbAddr: "172.17.189.140:27080", // MongodbAddr: "192.168.3.206:27002", // DbName: "qfw_data", // Size: 10, // UserName: "root", // Password: "root", // //Direct: true, //} //MgoT.InitPool() ////测试环境es //Es = &elastic.Elastic{ // S_esurl: "http://192.168.3.149:9201", // //S_esurl: "http://172.17.4.184:19805", // I_size: 5, // Username: "", // Password: "", //} //Es.InitElasticSize() //es Es = &elastic.Elastic{ //S_esurl: "http://127.0.0.1:19908", S_esurl: "http://172.17.4.184:19908", I_size: 5, Username: "jybid", Password: "Top2023_JEB01i@31", } Es.InitElasticSize() //es 新集群 EsNew = &elastic.Elastic{ //S_esurl: "http://127.0.0.1:19905", S_esurl: "http://172.17.4.184:19905", I_size: 5, Username: "jybid", Password: "Top2023_JEB01i@31", } EsNew.InitElasticSize() } func main() { Init() //InitEsBiddingField() //go updateMethod() //更新mongodb //go updateEsMethod() //更新es //go updateEsHrefMethod() //更新es href 字段 //go updateProjectEsMethod() //taskRunProject() //taskRunBidding() //dealBidding() //正式环境bidding数据处理 //dealBiddingAi() //正式环境bidding数据处理 //dealBiddingTest() // 测试环境数据处理 //dealBiddingEsHref() // 根据临时表,更新es href 字段 //dealBiddingNiJian() //更新拟建数据中buyer = owner //updateBiddingBidamount() //updateProject() //-------------------------------// fixBiddingEs() log.Info("over") c := make(chan bool, 1) <-c } // fixBiddingEs 修复bidding 索引数据, func fixBiddingEs() { defer util.Catch() sess := MgoR.GetMgoConn() defer MgoR.DestoryMongoConn(sess) where := map[string]interface{}{ "_id": map[string]interface{}{ "$gte": mongodb.StringTOBsonId("6847fc265f834436f08ef4fe"), "$lte": mongodb.StringTOBsonId("6848e42b5f834436f092f645"), }, } it := sess.DB("qfw").C("result_20220219").Find(where).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } biddingID := mongodb.BsonIdToSId(tmp["_id"]) repeat := util.IntAll(tmp["repeat"]) repeat_reason := util.ObjToString(tmp["repeat_reason"]) if repeat == 1 && strings.Contains(repeat_reason, "采集源重复") { Es.DeleteByID("bidding", biddingID) log.Info("fixBiddingEs", zap.String("biddingID", biddingID)) EsNew.DeleteByID("bidding", biddingID) } } } func InitEsBiddingField() { now := time.Now() info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1) if len(*info) > 0 { for _, m := range *info { if util.IntAll(m["level"]) == 1 { BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"]) } else if util.IntAll(m["level"]) == 2 { pfield := util.ObjToString(m["pfield"]) pfieldMap := BiddingLevelField[pfield] if pfieldMap == nil { pfieldMap = make(map[string]string, 0) } pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"]) BiddingLevelField[pfield] = pfieldMap } } } log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField))) log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField))) log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds())) } // taskRun 更新es 省市区三个字段 func taskRunBidding() { defer util.Catch() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) //查询条件 //q := map[string]interface{}{ // //"_id": map[string]interface{}{ // // //"$gt": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cea"), // // //"$lte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"), // // // // //"$gte": mongodb.StringTOBsonId("65d73ba366cf0db42aca6e2f"), // // "$lte": mongodb.StringTOBsonId("661e347d66cf0db42aa1a52f"), // //}, // //"comeintime": map[string]interface{}{ // // "$gt": 1669824000, // // //"$lte": 1669864950, // // "$lte": 1702265941, // //}, // //"site": "国家能源e购", // "toptype": map[string]interface{}{"$exists": 0}, //} //selected := map[string]interface{}{"contenthtml": 0, "detail": 0} it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter() fmt.Println("taskRun 开始") count := 0 //realNum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%100 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } update := map[string]interface{}{} // 1.更新省市区 //if area, ok := tmp["area"]; ok && area != nil { // update["area"] = area //} else { // update["area"] = "" //} // //if city, ok := tmp["city"]; ok && city != nil { // update["city"] = city //} else { // update["city"] = "" //} // //if district, ok := tmp["district"]; ok && district != nil { // if district == "乌拉盖管委会" { // update["district"] = "乌拉盖管理区管委会" // } else if district == "错那县" { // update["district"] = "错那市" // } else if district == "河南周口经济开发区" { // update["district"] = "周口临港开发区" // } else if district == "米林县" { // update["district"] = "米林市" // } // //} //-------------------------------------------// // 2.更新中标单位、采购单位、代理机构 biddingID := util.ObjToString(tmp["id"]) //biddingID := mongodb.BsonIdToSId(tmp["_id"]) if _, ok := tmp["buyer"]; ok { update["buyer"] = tmp["buyer"] } if _, ok := tmp["agency"]; ok { update["agency"] = tmp["agency"] } if _, ok := tmp["s_winner"]; ok { update["s_winner"] = tmp["s_winner"] } if _, ok := tmp["winner"]; ok { update["winner"] = tmp["winner"] } //-------------------------------------------// //3. 更新中标金额 //biddingID := util.ObjToString(tmp["id"]) //if _, ok := tmp["nb"]; !ok { // continue //} else { // update["bidamount"] = tmp["nb"] //} //update["bidamount"] = tmp["bidamount"] //// 更新 MongoDB + ES if len(update) > 0 { MgoB.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update}) //2.es 项目 更新字段 err := Es.UpdateDocument("bidding", biddingID, update) err = EsNew.UpdateDocument("bidding", biddingID, update) if err != nil && err.Error() != "Document not updated: noop" { log.Info("bidding es update err", err, biddingID) } } } log.Info("Run Over...Count:", log.Int("count", count)) } // taskRunProject 更新项目表 省市区 func taskRunProject() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) // 项目数据 MgoP := &mongodb.MongodbSim{ MongodbAddr: "172.17.4.85:27080", //MongodbAddr: "127.0.0.1:27080", Size: 10, DbName: "qfw", //Direct: true, } MgoP.InitPool() selected := map[string]interface{}{"contenthtml": 0, "detail": 0} it := sess.DB("qfw").C("zktest_0423_info_new").Find(nil).Select(selected).Sort("_id").Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } biddingID := mongodb.BsonIdToSId(tmp["_id"]) where := map[string]interface{}{ "ids": biddingID, } // 找到对应项目数据 p, _ := MgoP.FindOne("projectset_20230904", where) projectId := mongodb.BsonIdToSId((*p)["_id"]) //1.更新MongoDB update := map[string]interface{}{} if area, ok := tmp["area"]; ok && area != nil { update["area"] = area } else { update["area"] = "" } if city, ok := tmp["city"]; ok && city != nil { update["city"] = city } else { update["city"] = "" } if district, ok := tmp["district"]; ok && district != nil { update["district"] = district } else { update["district"] = "" } if len(update) > 0 { MgoP.UpdateById("projectset_20230904", projectId, map[string]interface{}{"$set": update}) //2.es 项目 更新字段 err := Es.UpdateDocument("projectset", projectId, update) if err != nil { log.Info("es update err", err, projectId) } } //2.es 项目 更新字段 //if len(update) > 0 { // // 更新es // //updateEsPool <- []map[string]interface{}{ // // {"_id": projectId}, // // update, // //} //} } log.Info("Run Over...Count:", log.Int("count", count)) } // dealData 正式环境,同步合同期限 func dealData() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) //where := map[string]interface{}{ // "_id": mongodb.StringTOBsonId("65c5a36a66cf0db42ab9c1ef"), //} selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1} it := sess.DB("qfw").C("zktest_quanliang_0210_fbs").Find(nil).Select(&selected).Iter() count := 0 realNum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } idStr := mongodb.BsonIdToSId(tmp["_id"]) update := make(map[string]interface{}) if tmp["signaturedate"] != nil { update["signaturedate"] = tmp["signaturedate"] } if tmp["contractperiod"] != nil { update["contractperiod"] = tmp["contractperiod"] } if tmp["expiredate"] != nil { update["expiredate"] = tmp["expiredate"] } if len(update) == 0 { continue } //bidding 表 if idStr > "5a862e7040d2d9bbe88e3b1f" { bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1}) data := *bidding Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update}) // 针对存量数据,重复数据不进索引 if util.IntAll(data["extracttype"]) == -1 { tmp = make(map[string]interface{}) continue } } else { //bidding_back bidding, _ := Mgo.FindById("bidding_back", idStr, map[string]interface{}{"extracttype": 1}) data := *bidding Mgo.UpdateById("bidding_back", idStr, map[string]interface{}{"$set": update}) // 针对存量数据,重复数据不进索引 if util.IntAll(data["extracttype"]) == -1 { tmp = make(map[string]interface{}) continue } } realNum++ //2.es 更新字段 esUpdate := update esUpdate["id"] = idStr if len(esUpdate) > 0 { // 更新es updateEsPool <- []map[string]interface{}{ {"_id": mongodb.BsonIdToSId(tmp["_id"])}, esUpdate, } } tmp = make(map[string]interface{}) } log.Info("Run Over...Count:", log.Int("count", count), log.Int("realNum", realNum)) } // dealResult 查询抽取表,更新合同周期字段;是dealData的后面遗漏数据 func dealResult() { defer util.Catch() sess := MgoR.GetMgoConn() defer MgoR.DestoryMongoConn(sess) where := map[string]interface{}{ "_id": map[string]interface{}{ "$gte": mongodb.StringTOBsonId("5a4909cf40d2d9bbe8ab329c"), "$lte": mongodb.StringTOBsonId("5a4ad94d40d2d9bbe8ae0183"), }, "subtype": "合同", } selected := map[string]interface{}{"signaturedate": 1, "contractperiod": 1, "expiredate": 1} it := sess.DB("qfw").C("result_20220219").Find(where).Select(&selected).Iter() count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } idStr := mongodb.BsonIdToSId(tmp["_id"]) update := make(map[string]interface{}) if tmp["signaturedate"] != nil { update["signaturedate"] = tmp["signaturedate"] } if tmp["contractperiod"] != nil { update["contractperiod"] = tmp["contractperiod"] } if tmp["expiredate"] != nil { update["expiredate"] = tmp["expiredate"] } if len(update) == 0 { continue } bidding, _ := Mgo.FindById("bidding", idStr, map[string]interface{}{"extracttype": 1}) data := *bidding Mgo.UpdateById("bidding", idStr, map[string]interface{}{"$set": update}) // 针对存量数据,重复数据不进索引 if util.IntAll(data["extracttype"]) == -1 { tmp = make(map[string]interface{}) continue } //2.es 更新字段 esUpdate := update esUpdate["id"] = idStr if len(esUpdate) > 0 { // 更新es updateEsPool <- []map[string]interface{}{ {"_id": mongodb.BsonIdToSId(tmp["_id"])}, esUpdate, } } tmp = make(map[string]interface{}) } log.Info("Run Over...Count:", log.Int("count", count)) } // dealBidding 处理bidding数据 func dealBidding() { defer util.Catch() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) //where := map[string]interface{}{ // "comeintime": map[string]interface{}{ // "$lt": 1722009600, // //"$lt": 1718812802, // "$gte": 1718899200, // }, //} //where := map[string]interface{}{ // "_id": map[string]interface{}{ // "$gte": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea71e"), // "$lt": mongodb.StringTOBsonId("66aa067e66cf0db42a8ea720"), // }, //} it := sess.DB("qfw").C("bidding").Find(nil).Select(nil).Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } //update := map[string]interface{}{} esUpdate := map[string]interface{}{} if util.IntAll(tmp["extracttype"]) == -1 { continue } if util.ObjToString(tmp["purchasing"]) == "" { continue } esUpdate["purchasing"] = tmp["purchasing"] // 2.更新中标单位,中标金额 //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil { // update["tag_topinformation"] = tag_topinformation //} // //if property_form, ok := tmp["property_form"]; ok && property_form != nil { // update["property_form"] = property_form //} biddingID := mongodb.BsonIdToSId(tmp["_id"]) //fmt.Println(biddingID) /** "s_subscopeclass" : "其它", "s_topscopeclass" : "其它", "subscopeclass" : [ "其它" ], "topscopeclass" : [ "其它" ], */ //// 行业分类默认值 //resultSubs := make([]string, 0) //resultTobs := make([]string, 0) //if topscopeclass, ok := tmp["topscopeclass"]; ok && topscopeclass != nil { // if topps, ok2 := topscopeclass.([]interface{}); ok2 { // for _, v := range topps { // top := util.ObjToString(v) // if top != "" { // resultTobs = append(resultTobs, top) // } // } // } // //1.一级分类是空数组或者 是 其它 // if len(resultTobs) == 0 || resultTobs[0] == "其它" { // update["topscopeclass"] = []string{"其它"} // update["subscopeclass"] = []string{"其它"} // update["s_topscopeclass"] = "其它" // update["s_subscopeclass"] = "其它" // esUpdate["s_topscopeclass"] = "其它" // esUpdate["s_subscopeclass"] = "其它" // esUpdate["topscopeclass"] = []string{"其它"} // } else { // if subs, ok3 := tmp["subscopeclass"]; ok3 { // if subbs, ok4 := subs.([]interface{}); ok4 { // for _, v := range subbs { // sub := util.ObjToString(v) // if sub != "" && sub != "其它" { // resultSubs = append(resultSubs, sub) // } // } // } // } // newTops, newSubs, cleanedTops := ProcessTopscopeclass(resultTobs, resultSubs) // update["topscopeclass"] = newTops // update["subscopeclass"] = newSubs // update["s_topscopeclass"] = strings.Join(cleanedTops, ",") // update["s_subscopeclass"] = strings.Join(newSubs, ",") // esUpdate["s_topscopeclass"] = strings.Join(cleanedTops, ",") // esUpdate["s_subscopeclass"] = strings.Join(newSubs, ",") // esUpdate["topscopeclass"] = newTops // } // //} else { // update["topscopeclass"] = []string{"其它"} // update["subscopeclass"] = []string{"其它"} // update["s_topscopeclass"] = "其它" // update["s_subscopeclass"] = "其它" // esUpdate["s_topscopeclass"] = "其它" // esUpdate["s_subscopeclass"] = "其它" // esUpdate["topscopeclass"] = []string{"其它"} //} // ////procurementlist 处理预计采购时间 ////if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil { //// field := "procurementlist" //// if tmp[field] != nil { //// if field == "procurementlist" { //// if tmp["procurementlist"] != nil { //// var arr []interface{} //// plist := tmp["procurementlist"].([]interface{}) //// for _, p := range plist { //// p1 := p.(map[string]interface{}) //// p2 := make(map[string]interface{}) //// for k, v := range BiddingLevelField[field] { //// if k == "projectname" && util.ObjToString(p1[k]) == "" { //// p2[k] = util.ObjToString(tmp["projectname"]) //// } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" { //// p2[k] = util.ObjToString(tmp["buyer"]) //// } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" { //// res := getMethod(util.ObjToString(p1[k])) //// if res != 0 { //// p2[k] = res //// } //// } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v { //// p2[k] = p1[k] //// } //// //// } //// arr = append(arr, p2) //// } //// if len(arr) > 0 { //// esUpdate[field] = arr //// } //// } //// } //// } ////} // //if len(update) > 0 { // //fmt.Println("aaaaa", biddingID) // //更新mongo // //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update}) // //更新MongoDB // updatePool <- []map[string]interface{}{ // {"_id": tmp["_id"]}, // {"$set": update}, // } // // //2.es 项目 更新字段 // //err := Es.UpdateDocument("bidding", biddingID, update) // //if err != nil && err.Error() != "Document not updated: noop" { // // log.Info("bidding es update err", err, biddingID) // //} // //// 更新es // //updateEsPool <- []map[string]interface{}{ // // {"_id": biddingID}, // // update, // //} //} // 更新Es 数据 if len(esUpdate) > 0 { // 更新es updateEsPool <- []map[string]interface{}{ {"_id": biddingID}, esUpdate, } } } log.Info("Run Over...Count:", log.Int("count", count)) } // dealBiddingAi 处理qfw_ai 数据库bidding 数据 func dealBiddingAi() { defer util.Catch() sess := MgoBAi.GetMgoConn() defer MgoBAi.DestoryMongoConn(sess) it := sess.DB("qfw_ai").C("zxl_20240926").Find(nil).Select(nil).Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { fmt.Println("current:", count) } biddingID := mongodb.BsonIdToSId(tmp["_id"]) update := map[string]interface{}{} //if budget, ok := tmp["budget"]; ok && budget != nil { // update["budget"] = budget //} if bidamount, ok := tmp["bidamount"]; ok && bidamount != nil { update["bidamount"] = bidamount } else { update["bidamount"] = 0.0 } //if projectcode, ok := tmp["projectcode"]; ok && projectcode != nil { // update["projectcode"] = projectcode //} if len(update) > 0 { MgoBAi.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update}) //2.es 项目 更新字段 err := Es.UpdateDocument("bidding_ai", biddingID, update) if err != nil && err.Error() != "Document not updated: noop" { log.Info("bidding es update err", err, biddingID) } } } fmt.Println("over ----------- over ") } func dealBiddingByEs() { //url := "http://172.17.4.184:19908" url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "bidding" //索引名称 //index := "projectset" //索引名称 // 创建 Elasticsearch 客户端 client, err := es7.NewClient( es7.SetURL(url), es7.SetBasicAuth(username, password), es7.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } query := es7.NewBoolQuery() query.Must(es7.NewRangeQuery("comeintime").Gt(1718812800)) query.MustNot(es7.NewExistsQuery("s_topscopeclass")) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := es7.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { fmt.Printf("解析文档失败:%s", err) continue } //delete(doc, "filetext") //delete(doc, "detail") // ////存入新表 //err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc) //if err != nil { // fmt.Println("error", doc["id"]) //} } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) fmt.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } fmt.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { fmt.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") } // dealBiddingTest 处理测试环境数据 func dealBiddingTest() { defer util.Catch() sess := MgoT.GetMgoConn() defer MgoT.DestoryMongoConn(sess) it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } update := map[string]interface{}{} // 2.更新中标单位,中标金额 //if tag_topinformation, ok := tmp["tag_topinformation"]; ok && tag_topinformation != nil { // update["tag_topinformation"] = tag_topinformation //} // //if property_form, ok := tmp["property_form"]; ok && property_form != nil { // update["property_form"] = property_form //} biddingID := mongodb.BsonIdToSId(tmp["_id"]) /** "s_subscopeclass" : "其它", "s_topscopeclass" : "其它", "subscopeclass" : [ "其它" ], "topscopeclass" : [ "其它" ], */ // 行业分类默认值 if topscopeclass, ok := tmp["topscopeclass"]; !ok && topscopeclass == nil { update["topscopeclass"] = []string{"其它"} update["s_topscopeclass"] = "其它" } if subscopeclass, ok := tmp["subscopeclass"]; !ok && subscopeclass == nil { update["subscopeclass"] = []string{"其它"} update["s_subscopeclass"] = "其它" } if util.ObjToString(tmp["s_topscopeclass"]) == "其它" { update["topscopeclass"] = []string{"其它"} update["s_topscopeclass"] = "其它" } if util.ObjToString(tmp["s_subscopeclass"]) == "其它" { update["subscopeclass"] = []string{"其它"} update["s_subscopeclass"] = "其它" } //procurementlist 处理预计采购时间 if procurementlist, ok := tmp["procurementlist"]; ok && procurementlist != nil { for field, _ := range BiddingField { if tmp[field] != nil { if field == "procurementlist" { if tmp["procurementlist"] != nil { var arr []interface{} plist := tmp["procurementlist"].([]interface{}) for _, p := range plist { p1 := p.(map[string]interface{}) p2 := make(map[string]interface{}) for k, v := range BiddingLevelField[field] { if k == "projectname" && util.ObjToString(p1[k]) == "" { p2[k] = util.ObjToString(tmp["projectname"]) } else if k == "buyer" && util.ObjToString(p1[k]) == "" && util.ObjToString(tmp["buyer"]) != "" { p2[k] = util.ObjToString(tmp["buyer"]) } else if k == "expurasingtime" && util.ObjToString(p1[k]) != "" { res := getMethod(util.ObjToString(p1[k])) if res != 0 { p2[k] = res } } else if p1[k] != nil && reflect.TypeOf(p1[k]).String() == v { p2[k] = p1[k] } } arr = append(arr, p2) } if len(arr) > 0 { update[field] = arr } } } } } } if len(update) > 0 { fmt.Println("aaaaa", biddingID) //更新mongo //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update}) //更新MongoDB //updatePool <- []map[string]interface{}{ // {"_id": tmp["_id"]}, // {"$set": update}, //} //2.es 项目 更新字段 //err := Es.UpdateDocument("bidding", biddingID, update) //if err != nil && err.Error() != "Document not updated: noop" { // log.Info("bidding es update err", err, biddingID) //} // 更新es //updateEsPool <- []map[string]interface{}{ // {"_id": biddingID}, // update, //} } } log.Info("Run Over...Count:", log.Int("count", count)) } // updateMethod 更新MongoDB func updateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } } // updateEsMethod 更新es func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) EsNew.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) EsNew.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } } // updateEsMethod 更新es href 字段 func updateEsHrefMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) Es.UpdateBulk("bidding_ai", arru...) Es.UpdateBulk("bidding_temporary", arru...) EsNew.UpdateBulk("bidding", arru...) EsNew.UpdateBulk("bidding_customer", arru...) EsNew.UpdateBulk("bidding_free", arru...) EsNew.UpdateBulk("bidding_year", arru...) EsNew.UpdateBulk("bidding_all", arru...) EsNew.UpdateBulk("bidding_temporary", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) Es.UpdateBulk("bidding_ai", arru...) Es.UpdateBulk("bidding_temporary", arru...) EsNew.UpdateBulk("bidding", arru...) EsNew.UpdateBulk("bidding_customer", arru...) EsNew.UpdateBulk("bidding_free", arru...) EsNew.UpdateBulk("bidding_year", arru...) EsNew.UpdateBulk("bidding_all", arru...) EsNew.UpdateBulk("bidding_temporary", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } } // updateProjectEsMethod 更新项目索引 func updateProjectEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateProjectEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateProjectEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateProjectEsSp }() Es.UpdateBulk("projectset", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateProjectEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateProjectEsSp }() Es.UpdateBulk("projectset", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }