package main import ( "bytes" "context" "encoding/json" "github.com/elastic/go-elasticsearch/v7" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "log" "time" ) func fixQyxy() { // 连接 Elasticsearch cfg := elasticsearch.Config{ //Addresses: []string{"http://127.0.0.1:19908"}, // 或者 "http://172.17.4.184:19908" Addresses: []string{"http://172.17.4.184:19908"}, // 或者 "http://172.17.4.184:19908" Username: "jybid", Password: "Top2023_JEB01i@31", } es, err := elasticsearch.NewClient(cfg) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败: %s", err) } // 连接 MongoDB sess := MgoQy.GetMgoConn() defer MgoQy.DestoryMongoConn(sess) // 查询 MongoDB 需要处理的企业 where := map[string]interface{}{"use_flag": 10} queryMgo := sess.DB("mixdata").C("qyxy_std").Find(where).Select(nil).Iter() count := 0 // 遍历 MongoDB 结果 for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ { if count%100 == 0 { log.Println("current:", count, tmp["_id"]) } id := util.ObjToString(tmp["_id"]) // **删除 MongoDB 数据** whereDel := map[string]interface{}{"_id": tmp["_id"]} MgoQy.Delete("qyxy_std", whereDel) MgoQy.SaveByOriID("wcc_qyxy_std_delete", tmp) company_name := util.ObjToString(tmp["company_name"]) if company_name == "无" || company_name == "|" || company_name == "" { continue } // 查询企业是否已经存在 where2 := map[string]interface{}{ "company_name": company_name, "use_flag": 0, } std, _ := MgoQy.FindOne("qyxy_std", where2) var newID string if len(*std) > 0 { newID = util.ObjToString((*std)["_id"]) } // **构造查询 JSON** query := map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ {"term": map[string]interface{}{"entidlist": id}}, }, }, }, } queryBody, _ := json.Marshal(query) ctx := context.Background() res, err := es.Search( es.Search.WithContext(ctx), es.Search.WithIndex("projectset"), es.Search.WithTrackTotalHits(true), es.Search.WithPretty(), es.Search.WithBody(bytes.NewReader(queryBody)), ) if err != nil { log.Println("ES 查询失败:", err) continue } defer res.Body.Close() // **解析初始查询结果** var esRes map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&esRes); err != nil { log.Println("解析查询结果失败:", err) continue } // 解析查询结果 hits, ok := esRes["hits"].(map[string]interface{})["hits"].([]interface{}) if !ok || len(hits) == 0 { continue } //log.Println("bbbbbb", len(hits), id, newID) for _, hit := range hits { doc, _ := hit.(map[string]interface{})["_source"].(map[string]interface{}) esID := util.ObjToString(doc["id"]) newEntidlist := make([]string, 0) //存入新表 if entidlist, ok := doc["entidlist"].([]interface{}); ok && len(entidlist) > 0 { for _, v := range entidlist { list_id := util.ObjToString(v) if list_id != id && list_id != "-" { newEntidlist = append(newEntidlist, list_id) } } if newID != "" { newEntidlist = append(newEntidlist, newID) } //更新es esUpdate := map[string]interface{}{ "entidlist": newEntidlist, } // 更新Es 数据 updateEsPool <- []map[string]interface{}{ {"_id": esID}, esUpdate, } log.Println("aaaaaaaa", esID, newEntidlist) ////更新项目MongoDB MgoP.UpdateById("projectset_20230904", esID, map[string]interface{}{"$set": esUpdate}) } } } log.Println("数据处理完毕") } func fixQyxy2() { // 连接 Elasticsearch cfg := elasticsearch.Config{ //Addresses: []string{"http://172.17.4.184:19908"}, Addresses: []string{"http://127.0.0.1:19908"}, Username: "jybid", Password: "Top2023_JEB01i@31", } es, err := elasticsearch.NewClient(cfg) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败: %s", err) } // 连接 MongoDB sess := MgoQy.GetMgoConn() defer MgoQy.DestoryMongoConn(sess) // 查询 MongoDB 需要处理的企业 where := map[string]interface{}{"use_flag": 10} queryMgo := sess.DB("mixdata").C("qyxy_std").Find(where).Select(nil).Iter() count := 0 // 遍历 MongoDB 结果 for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ { if count%100 == 0 { log.Println("current:", count, tmp["_id"]) } id := util.ObjToString(tmp["_id"]) company_name := util.ObjToString(tmp["company_name"]) if company_name == "无" || company_name == "|" || company_name == "" { continue } // 查询企业是否已经存在 where2 := map[string]interface{}{ "company_name": company_name, "use_flag": 0, } std, _ := MgoQy.FindOne("qyxy_std", where2) var newID string if len(*std) > 0 { newID = id } // **构造查询 JSON** query := map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ {"term": map[string]interface{}{"entidlist": id}}, }, }, }, "size": 500, "sort": []map[string]interface{}{ {"_doc": map[string]string{"order": "asc"}}, }, } queryBody, _ := json.Marshal(query) // **初始化滚动查询** ctx := context.Background() //scrollTime := "2m" scrollTime, _ := time.ParseDuration("2m") res, err := es.Search( es.Search.WithContext(ctx), es.Search.WithIndex("projectset"), es.Search.WithBody(bytes.NewReader(queryBody)), es.Search.WithScroll(scrollTime), ) if err != nil { log.Println("ES 查询失败:", err) continue } defer res.Body.Close() // **解析初始查询结果** var esRes map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&esRes); err != nil { log.Println("解析查询结果失败:", err) continue } scrollID, _ := esRes["_scroll_id"].(string) if scrollID == "" { log.Println("获取 Scroll ID 失败") continue } total := 0 for { // 解析查询结果 hits, ok := esRes["hits"].(map[string]interface{})["hits"].([]interface{}) if !ok || len(hits) == 0 { break } // **批量更新 ES** //bulkUpdate := make([]byte, 0) for _, hit := range hits { doc, _ := hit.(map[string]interface{})["_source"].(map[string]interface{}) //esID := util.ObjToString(doc["id"]) newEntidlist := make([]string, 0) if entidlist, ok := doc["entidlist"].([]interface{}); ok { for _, v := range entidlist { list_id := util.ObjToString(v) if list_id != id && list_id != "-" { newEntidlist = append(newEntidlist, list_id) } } } if newID != "" { newEntidlist = append(newEntidlist, newID) } // 构造 Bulk Update 请求 //updateData := map[string]interface{}{ // "update": map[string]string{"_id": esID}, //} //updateBody := map[string]interface{}{ // "doc": map[string]interface{}{"entidlist": newEntidlist}, //} //updateDataJSON, _ := json.Marshal(updateData) //updateBodyJSON, _ := json.Marshal(updateBody) // //bulkUpdate = append(bulkUpdate, append(updateDataJSON, '\n')...) //bulkUpdate = append(bulkUpdate, append(updateBodyJSON, '\n')...) } // 发送批量更新请求 //if len(bulkUpdate) > 0 { // res, err := es.Bulk(bytes.NewReader(bulkUpdate), es.Bulk.WithContext(ctx), es.Bulk.WithIndex("projectset")) // if err != nil { // log.Println("批量更新 ES 失败:", err) // } else { // defer res.Body.Close() // log.Println("批量更新 ES 成功") // } //} total += len(hits) // **继续滚动查询** scrollReq := map[string]interface{}{ "scroll": scrollTime, "scroll_id": scrollID, } scrollBody, _ := json.Marshal(scrollReq) res, err = es.Scroll(es.Scroll.WithContext(ctx), es.Scroll.WithBody(bytes.NewReader(scrollBody))) if err != nil { log.Println("滚动查询失败:", err) break } defer res.Body.Close() // **解析滚动查询响应** if err := json.NewDecoder(res.Body).Decode(&esRes); err != nil { log.Println("解析滚动查询响应失败:", err) break } scrollID, _ = esRes["_scroll_id"].(string) if scrollID == "" { log.Println("滚动查询结束") break } log.Println("当前已更新:", total) } // **删除 MongoDB 数据** //whereDel := map[string]interface{}{"_id": id} //MgoQy.Delete("qyxy_std", whereDel) //MgoQy.SaveByOriID("wcc_qyxy_std_delete", tmp) // **清理滚动查询** if scrollID != "" { clearReq := map[string]interface{}{"scroll_id": []string{scrollID}} clearBody, _ := json.Marshal(clearReq) res, err := es.ClearScroll(es.ClearScroll.WithBody(bytes.NewReader(clearBody))) if err != nil { log.Println("清理滚动搜索失败:", err) } else { defer res.Body.Close() //log.Println("滚动查询清理成功") } } } log.Println("数据处理完毕") }