123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/olivere/elastic/v7"
- "io"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- )
- // fixQyxy 修复企业数据
- func fixQyxy() {
- //c查询use_flag=10的 qyxy 数据,然后去
- //url := "http://172.17.4.184:19908"
- url := "http://127.0.0.1:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- //index := "bidding" //索引名称
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- //
- sess := MgoQy.GetMgoConn()
- defer MgoQy.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "use_flag": 10,
- }
- queryMgo := sess.DB("mixdata").C("qyxy_std").Find(where).Select(nil).Limit(2).Iter()
- count := 0
- for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
- if count%1000 == 0 {
- log.Println("current:", count, tmp["_id"])
- }
- id := util.ObjToString(tmp["_id"])
- company_name := util.ObjToString(tmp["company_name"])
- if company_name == "无" || company_name == "|" {
- continue
- }
- where2 := map[string]interface{}{
- "company_name": company_name,
- "use_flag": 0,
- }
- std, _ := MgoQy.FindOne("qyxy_std", where2)
- var newID string
- if len(*std) > 0 {
- newID = util.ObjToString(tmp["_id"])
- }
- query := elastic.NewBoolQuery().
- Must(
- elastic.NewTermQuery("entidlist", id), // 模糊匹配 projectname
- )
- ctx := context.Background()
- //开始滚动搜索
- scrollID := ""
- scroll := "10m"
- searchSource := elastic.NewSearchSource().
- Query(query).
- Size(10000).
- Sort("_doc", true) //升序排序
- //Sort("_doc", false) //降序排序
- searchService := client.Scroll("projectset").
- Size(10000).
- Scroll(scroll).
- SearchSource(searchSource)
- res, err := searchService.Do(ctx)
- if err != nil {
- if err == io.EOF {
- fmt.Println("没有数据")
- } else {
- panic(err)
- }
- }
- //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
- fmt.Println("总数是:", res.TotalHits())
- total := 0
- //1.处理更新es 数据
- for len(res.Hits.Hits) > 0 {
- for _, hit := range res.Hits.Hits {
- var doc map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- esID := util.ObjToString(doc["id"])
- newEntidlist := make([]string, 0)
- //存入新表
- if entidlist, ok := doc["entidlist"].([]interface{}); ok && len(entidlist) > 0 {
- for _, v := range entidlist {
- list_id := util.ObjToString(v)
- if list_id != id && list_id != "-" {
- newEntidlist = append(newEntidlist, list_id)
- }
- }
- if newID != "" {
- newEntidlist = append(newEntidlist, newID)
- }
- //更新es
- esUpdate := map[string]interface{}{
- "entidlist": newEntidlist,
- }
- // 更新Es 数据
- updateEsPool <- []map[string]interface{}{
- {"_id": esID},
- esUpdate,
- }
- }
- }
- total = total + len(res.Hits.Hits)
- scrollID = res.ScrollId
- res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
- log.Println("current count:", total)
- if err != nil {
- if err == io.EOF {
- // 滚动到最后一批数据,退出循环
- break
- }
- log.Println("滚动搜索失败:", err, res)
- break // 处理错误时退出循环
- }
- }
- //2.删除MongoDB
- whereDel := map[string]interface{}{
- "_id": id,
- }
- MgoQy.Delete("qyxy_std", whereDel)
- MgoQy.Save("wcc_qyxy_std_delete", tmp)
- // 在循环外调用 ClearScroll
- _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
- if err != nil {
- log.Printf("清理滚动搜索失败:%s", err)
- }
- }
- log.Println("数据处理完毕")
- }
- // findData 找出企业信息中,company_type 不等于合体工商户的企业数据,然后写入一个临时表
- func findData() {
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "mixdata",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- //where := map[string]interface{}{
- // "company_type": map[string]interface{}{
- // "$ne": "个体工商户",
- // },
- //}
- query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(map[string]interface{}{"company_name": 1, "company_type": 1, "company_status": 1, "use_flag": 1}).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count)
- }
- companyType := util.ObjToString(tmp["company_type"])
- if companyType == "个体工商户" {
- continue
- }
- company_name := util.ObjToString(tmp["company_name"])
- whereN := map[string]interface{}{
- "company_name": company_name,
- }
- num := Mgo.Count("qyxy_std", whereN)
- if num > 1 {
- Mgo.Save("wcc_qyxy_20240311", tmp)
- }
- tmp = make(map[string]interface{})
- }
- log.Println("结束")
- }
- // getCompanyName
- func getCompanyName() {
- // 找出wcc_qyxy_20240311表中,公司名称有多个,并且一个use_flag=0,一个use_flag=10的数据
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "mixdata",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- companyMap := make(map[string]bool)
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- query := sess.DB("mixdata").C("wcc_qyxy_20240311").Find(nil).Select(map[string]interface{}{"company_name": 1, "use_flag": 1}).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count)
- }
- name := util.ObjToString(tmp["company_name"])
- if companyMap[name] {
- continue
- } else {
- companyMap[name] = true
- }
- where := map[string]interface{}{
- "company_name": name,
- }
- res, _ := Mgo.Find("wcc_qyxy_20240311", where, nil, nil, false, -1, -1)
- flaga := false
- flagb := false
- for _, v := range *res {
- if util.Int64All(v["use_flag"]) == 0 {
- flaga = true
- } else if util.Int64All(v["use_flag"]) == 10 {
- flagb = true
- }
- }
- // 存在0和10 二个状态
- if flaga && flagb {
- Mgo.Save("wcc_qyxy_name_0325", map[string]interface{}{"company_name": name})
- }
- tmp = make(map[string]interface{})
- }
- log.Println("结束")
- }
- // SpecialData 处理特殊企业数据,更新 qyxy_std 表use_flag
- func SpecialData() {
- // 处理 特殊企业 数据,
- tables := []string{"special_enterprise", "special_foundation", "special_law_office", "special_social_organ", "special_trade_union"}
- //1.先处理 special_enterprise 表数据,循环数据,根据company_name 去查询 qyxy_std 表,如果
- // 数据只有一条,并且 special_enterprise.company_id 等于 qyxy_std._id;就更新 qyxy_std表的 use_flag 字段
- // qyxy_std 表
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "mixdata",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- // 181 凭安库
- Mgo2 := &mongodb.MongodbSim{
- MongodbAddr: "172.17.4.181:27001",
- //MongodbAddr: "127.0.0.1:27001",
- DbName: "mixdata",
- Size: 10,
- UserName: "",
- Password: "",
- //Direct: true,
- }
- Mgo2.InitPool()
- sess := Mgo2.GetMgoConn()
- defer Mgo2.DestoryMongoConn(sess)
- for _, v := range tables {
- query := sess.DB("mixdata").C(v).Find(nil).Select(map[string]interface{}{"company_name": 1, "use_flag": 1, "company_id": 1}).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count, "table - ", v, tmp["company_name"])
- }
- if _, ok := tmp["company_id"]; !ok {
- continue
- }
- where := map[string]interface{}{
- "_id": tmp["company_id"],
- }
- //update := map[string]interface{}{
- // "use_flag": tmp["use_flag"],
- //}
- //Mgo.Update("qyxy_std", where, map[string]interface{}{"$set": update}, true, false)
- //name := util.ObjToString(tmp["company_name"])
- //where := map[string]interface{}{
- // "company_name": name,
- //}
- // 如果 ID 相同,std 表use_flag =10,更新
- datas, _ := Mgo.FindOne("qyxy_std", where)
- if len(*datas) == 0 {
- continue
- }
- //if util.Int64All((*datas)["use_flag"]) == util.Int64All(tmp["use_flag"]) {
- // continue
- //}
- //Mgo.Update("qyxy_std", where, map[string]interface{}{"$set": update}, true, false)
- data := *datas
- data["use_flag"] = tmp["use_flag"]
- Mgo.SaveByOriID("wcc_std_0411", data)
- }
- }
- log.Println("结束")
- }
- func StdData() {
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "mixdata",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- query := sess.DB("mixdata").C("wcc_std_0410").Find(nil).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count)
- }
- autoid := util.Int64All(tmp["autoid"])
- if autoid == 0 {
- continue
- }
- where := map[string]interface{}{
- "autoid": autoid,
- }
- datas, _ := Mgo.FindOne("qyxy_std", where)
- if len(*datas) == 0 {
- continue
- }
- Mgo.SaveByOriID("wcc_std_0411", datas)
- }
- log.Println("over")
- }
|