package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" ) // fixQyxy 修复企业数据 func fixQyxy() { //c查询use_flag=10的 qyxy 数据,然后去 //url := "http://172.17.4.184:19908" url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" //index := "bidding" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } // sess := MgoQy.GetMgoConn() defer MgoQy.DestoryMongoConn(sess) where := map[string]interface{}{ "use_flag": 10, } queryMgo := sess.DB("mixdata").C("qyxy_std").Find(where).Select(nil).Limit(2).Iter() count := 0 for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ { if count%1000 == 0 { log.Println("current:", count, tmp["_id"]) } id := util.ObjToString(tmp["_id"]) company_name := util.ObjToString(tmp["company_name"]) if company_name == "无" || company_name == "|" { continue } where2 := map[string]interface{}{ "company_name": company_name, "use_flag": 0, } std, _ := MgoQy.FindOne("qyxy_std", where2) var newID string if len(*std) > 0 { newID = util.ObjToString(tmp["_id"]) } query := elastic.NewBoolQuery(). Must( elastic.NewTermQuery("entidlist", id), // 模糊匹配 projectname ) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll("projectset"). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 //1.处理更新es 数据 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } esID := util.ObjToString(doc["id"]) newEntidlist := make([]string, 0) //存入新表 if entidlist, ok := doc["entidlist"].([]interface{}); ok && len(entidlist) > 0 { for _, v := range entidlist { list_id := util.ObjToString(v) if list_id != id && list_id != "-" { newEntidlist = append(newEntidlist, list_id) } } if newID != "" { newEntidlist = append(newEntidlist, newID) } //更新es esUpdate := map[string]interface{}{ "entidlist": newEntidlist, } // 更新Es 数据 updateEsPool <- []map[string]interface{}{ {"_id": esID}, esUpdate, } } } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } //2.删除MongoDB whereDel := map[string]interface{}{ "_id": id, } MgoQy.Delete("qyxy_std", whereDel) MgoQy.Save("wcc_qyxy_std_delete", tmp) // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } } log.Println("数据处理完毕") } // findData 找出企业信息中,company_type 不等于合体工商户的企业数据,然后写入一个临时表 func findData() { Mgo := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "mixdata", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) //where := map[string]interface{}{ // "company_type": map[string]interface{}{ // "$ne": "个体工商户", // }, //} query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(map[string]interface{}{"company_name": 1, "company_type": 1, "company_status": 1, "use_flag": 1}).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count) } companyType := util.ObjToString(tmp["company_type"]) if companyType == "个体工商户" { continue } company_name := util.ObjToString(tmp["company_name"]) whereN := map[string]interface{}{ "company_name": company_name, } num := Mgo.Count("qyxy_std", whereN) if num > 1 { Mgo.Save("wcc_qyxy_20240311", tmp) } tmp = make(map[string]interface{}) } log.Println("结束") } // getCompanyName func getCompanyName() { // 找出wcc_qyxy_20240311表中,公司名称有多个,并且一个use_flag=0,一个use_flag=10的数据 Mgo := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "mixdata", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() companyMap := make(map[string]bool) sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) query := sess.DB("mixdata").C("wcc_qyxy_20240311").Find(nil).Select(map[string]interface{}{"company_name": 1, "use_flag": 1}).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count) } name := util.ObjToString(tmp["company_name"]) if companyMap[name] { continue } else { companyMap[name] = true } where := map[string]interface{}{ "company_name": name, } res, _ := Mgo.Find("wcc_qyxy_20240311", where, nil, nil, false, -1, -1) flaga := false flagb := false for _, v := range *res { if util.Int64All(v["use_flag"]) == 0 { flaga = true } else if util.Int64All(v["use_flag"]) == 10 { flagb = true } } // 存在0和10 二个状态 if flaga && flagb { Mgo.Save("wcc_qyxy_name_0325", map[string]interface{}{"company_name": name}) } tmp = make(map[string]interface{}) } log.Println("结束") } // SpecialData 处理特殊企业数据,更新 qyxy_std 表use_flag func SpecialData() { // 处理 特殊企业 数据, tables := []string{"special_enterprise", "special_foundation", "special_law_office", "special_social_organ", "special_trade_union"} //1.先处理 special_enterprise 表数据,循环数据,根据company_name 去查询 qyxy_std 表,如果 // 数据只有一条,并且 special_enterprise.company_id 等于 qyxy_std._id;就更新 qyxy_std表的 use_flag 字段 // qyxy_std 表 Mgo := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "mixdata", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() // 181 凭安库 Mgo2 := &mongodb.MongodbSim{ MongodbAddr: "172.17.4.181:27001", //MongodbAddr: "127.0.0.1:27001", DbName: "mixdata", Size: 10, UserName: "", Password: "", //Direct: true, } Mgo2.InitPool() sess := Mgo2.GetMgoConn() defer Mgo2.DestoryMongoConn(sess) for _, v := range tables { query := sess.DB("mixdata").C(v).Find(nil).Select(map[string]interface{}{"company_name": 1, "use_flag": 1, "company_id": 1}).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count, "table - ", v, tmp["company_name"]) } if _, ok := tmp["company_id"]; !ok { continue } where := map[string]interface{}{ "_id": tmp["company_id"], } //update := map[string]interface{}{ // "use_flag": tmp["use_flag"], //} //Mgo.Update("qyxy_std", where, map[string]interface{}{"$set": update}, true, false) //name := util.ObjToString(tmp["company_name"]) //where := map[string]interface{}{ // "company_name": name, //} // 如果 ID 相同,std 表use_flag =10,更新 datas, _ := Mgo.FindOne("qyxy_std", where) if len(*datas) == 0 { continue } //if util.Int64All((*datas)["use_flag"]) == util.Int64All(tmp["use_flag"]) { // continue //} //Mgo.Update("qyxy_std", where, map[string]interface{}{"$set": update}, true, false) data := *datas data["use_flag"] = tmp["use_flag"] Mgo.SaveByOriID("wcc_std_0411", data) } } log.Println("结束") } func StdData() { Mgo := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "mixdata", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) query := sess.DB("mixdata").C("wcc_std_0410").Find(nil).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count) } autoid := util.Int64All(tmp["autoid"]) if autoid == 0 { continue } where := map[string]interface{}{ "autoid": autoid, } datas, _ := Mgo.FindOne("qyxy_std", where) if len(*datas) == 0 { continue } Mgo.SaveByOriID("wcc_std_0411", datas) } log.Println("over") }