123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- package main
- import (
- "bytes"
- "context"
- "encoding/json"
- "github.com/elastic/go-elasticsearch/v7"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "log"
- "time"
- )
- func fixQyxy() {
- // 连接 Elasticsearch
- cfg := elasticsearch.Config{
- //Addresses: []string{"http://127.0.0.1:19908"}, // 或者 "http://172.17.4.184:19908"
- Addresses: []string{"http://172.17.4.184:19908"}, // 或者 "http://172.17.4.184:19908"
- Username: "jybid",
- Password: "Top2023_JEB01i@31",
- }
- es, err := elasticsearch.NewClient(cfg)
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败: %s", err)
- }
- // 连接 MongoDB
- sess := MgoQy.GetMgoConn()
- defer MgoQy.DestoryMongoConn(sess)
- // 查询 MongoDB 需要处理的企业
- where := map[string]interface{}{"use_flag": 10}
- queryMgo := sess.DB("mixdata").C("qyxy_std").Find(where).Select(nil).Iter()
- count := 0
- // 遍历 MongoDB 结果
- for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
- if count%100 == 0 {
- log.Println("current:", count, tmp["_id"])
- }
- id := util.ObjToString(tmp["_id"])
- // **删除 MongoDB 数据**
- whereDel := map[string]interface{}{"_id": tmp["_id"]}
- MgoQy.Delete("qyxy_std", whereDel)
- MgoQy.SaveByOriID("wcc_qyxy_std_delete", tmp)
- company_name := util.ObjToString(tmp["company_name"])
- if company_name == "无" || company_name == "|" || company_name == "" {
- continue
- }
- // 查询企业是否已经存在
- where2 := map[string]interface{}{
- "company_name": company_name,
- "use_flag": 0,
- }
- std, _ := MgoQy.FindOne("qyxy_std", where2)
- var newID string
- if len(*std) > 0 {
- newID = util.ObjToString((*std)["_id"])
- }
- // **构造查询 JSON**
- query := map[string]interface{}{
- "query": map[string]interface{}{
- "bool": map[string]interface{}{
- "must": []map[string]interface{}{
- {"term": map[string]interface{}{"entidlist": id}},
- },
- },
- },
- }
- queryBody, _ := json.Marshal(query)
- ctx := context.Background()
- res, err := es.Search(
- es.Search.WithContext(ctx),
- es.Search.WithIndex("projectset"),
- es.Search.WithTrackTotalHits(true),
- es.Search.WithPretty(),
- es.Search.WithBody(bytes.NewReader(queryBody)),
- )
- if err != nil {
- log.Println("ES 查询失败:", err)
- continue
- }
- defer res.Body.Close()
- // **解析初始查询结果**
- var esRes map[string]interface{}
- if err := json.NewDecoder(res.Body).Decode(&esRes); err != nil {
- log.Println("解析查询结果失败:", err)
- continue
- }
- // 解析查询结果
- hits, ok := esRes["hits"].(map[string]interface{})["hits"].([]interface{})
- if !ok || len(hits) == 0 {
- continue
- }
- //log.Println("bbbbbb", len(hits), id, newID)
- for _, hit := range hits {
- doc, _ := hit.(map[string]interface{})["_source"].(map[string]interface{})
- esID := util.ObjToString(doc["id"])
- newEntidlist := make([]string, 0)
- //存入新表
- if entidlist, ok := doc["entidlist"].([]interface{}); ok && len(entidlist) > 0 {
- for _, v := range entidlist {
- list_id := util.ObjToString(v)
- if list_id != id && list_id != "-" {
- newEntidlist = append(newEntidlist, list_id)
- }
- }
- if newID != "" {
- newEntidlist = append(newEntidlist, newID)
- }
- //更新es
- esUpdate := map[string]interface{}{
- "entidlist": newEntidlist,
- }
- // 更新Es 数据
- updateEsPool <- []map[string]interface{}{
- {"_id": esID},
- esUpdate,
- }
- log.Println("aaaaaaaa", esID, newEntidlist)
- ////更新项目MongoDB
- MgoP.UpdateById("projectset_20230904", esID, map[string]interface{}{"$set": esUpdate})
- }
- }
- }
- log.Println("数据处理完毕")
- }
- func fixQyxy2() {
- // 连接 Elasticsearch
- cfg := elasticsearch.Config{
- //Addresses: []string{"http://172.17.4.184:19908"},
- Addresses: []string{"http://127.0.0.1:19908"},
- Username: "jybid",
- Password: "Top2023_JEB01i@31",
- }
- es, err := elasticsearch.NewClient(cfg)
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败: %s", err)
- }
- // 连接 MongoDB
- sess := MgoQy.GetMgoConn()
- defer MgoQy.DestoryMongoConn(sess)
- // 查询 MongoDB 需要处理的企业
- where := map[string]interface{}{"use_flag": 10}
- queryMgo := sess.DB("mixdata").C("qyxy_std").Find(where).Select(nil).Iter()
- count := 0
- // 遍历 MongoDB 结果
- for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
- if count%100 == 0 {
- log.Println("current:", count, tmp["_id"])
- }
- id := util.ObjToString(tmp["_id"])
- company_name := util.ObjToString(tmp["company_name"])
- if company_name == "无" || company_name == "|" || company_name == "" {
- continue
- }
- // 查询企业是否已经存在
- where2 := map[string]interface{}{
- "company_name": company_name,
- "use_flag": 0,
- }
- std, _ := MgoQy.FindOne("qyxy_std", where2)
- var newID string
- if len(*std) > 0 {
- newID = id
- }
- // **构造查询 JSON**
- query := map[string]interface{}{
- "query": map[string]interface{}{
- "bool": map[string]interface{}{
- "must": []map[string]interface{}{
- {"term": map[string]interface{}{"entidlist": id}},
- },
- },
- },
- "size": 500,
- "sort": []map[string]interface{}{
- {"_doc": map[string]string{"order": "asc"}},
- },
- }
- queryBody, _ := json.Marshal(query)
- // **初始化滚动查询**
- ctx := context.Background()
- //scrollTime := "2m"
- scrollTime, _ := time.ParseDuration("2m")
- res, err := es.Search(
- es.Search.WithContext(ctx),
- es.Search.WithIndex("projectset"),
- es.Search.WithBody(bytes.NewReader(queryBody)),
- es.Search.WithScroll(scrollTime),
- )
- if err != nil {
- log.Println("ES 查询失败:", err)
- continue
- }
- defer res.Body.Close()
- // **解析初始查询结果**
- var esRes map[string]interface{}
- if err := json.NewDecoder(res.Body).Decode(&esRes); err != nil {
- log.Println("解析查询结果失败:", err)
- continue
- }
- scrollID, _ := esRes["_scroll_id"].(string)
- if scrollID == "" {
- log.Println("获取 Scroll ID 失败")
- continue
- }
- total := 0
- for {
- // 解析查询结果
- hits, ok := esRes["hits"].(map[string]interface{})["hits"].([]interface{})
- if !ok || len(hits) == 0 {
- break
- }
- // **批量更新 ES**
- //bulkUpdate := make([]byte, 0)
- for _, hit := range hits {
- doc, _ := hit.(map[string]interface{})["_source"].(map[string]interface{})
- //esID := util.ObjToString(doc["id"])
- newEntidlist := make([]string, 0)
- if entidlist, ok := doc["entidlist"].([]interface{}); ok {
- for _, v := range entidlist {
- list_id := util.ObjToString(v)
- if list_id != id && list_id != "-" {
- newEntidlist = append(newEntidlist, list_id)
- }
- }
- }
- if newID != "" {
- newEntidlist = append(newEntidlist, newID)
- }
- // 构造 Bulk Update 请求
- //updateData := map[string]interface{}{
- // "update": map[string]string{"_id": esID},
- //}
- //updateBody := map[string]interface{}{
- // "doc": map[string]interface{}{"entidlist": newEntidlist},
- //}
- //updateDataJSON, _ := json.Marshal(updateData)
- //updateBodyJSON, _ := json.Marshal(updateBody)
- //
- //bulkUpdate = append(bulkUpdate, append(updateDataJSON, '\n')...)
- //bulkUpdate = append(bulkUpdate, append(updateBodyJSON, '\n')...)
- }
- // 发送批量更新请求
- //if len(bulkUpdate) > 0 {
- // res, err := es.Bulk(bytes.NewReader(bulkUpdate), es.Bulk.WithContext(ctx), es.Bulk.WithIndex("projectset"))
- // if err != nil {
- // log.Println("批量更新 ES 失败:", err)
- // } else {
- // defer res.Body.Close()
- // log.Println("批量更新 ES 成功")
- // }
- //}
- total += len(hits)
- // **继续滚动查询**
- scrollReq := map[string]interface{}{
- "scroll": scrollTime,
- "scroll_id": scrollID,
- }
- scrollBody, _ := json.Marshal(scrollReq)
- res, err = es.Scroll(es.Scroll.WithContext(ctx), es.Scroll.WithBody(bytes.NewReader(scrollBody)))
- if err != nil {
- log.Println("滚动查询失败:", err)
- break
- }
- defer res.Body.Close()
- // **解析滚动查询响应**
- if err := json.NewDecoder(res.Body).Decode(&esRes); err != nil {
- log.Println("解析滚动查询响应失败:", err)
- break
- }
- scrollID, _ = esRes["_scroll_id"].(string)
- if scrollID == "" {
- log.Println("滚动查询结束")
- break
- }
- log.Println("当前已更新:", total)
- }
- // **删除 MongoDB 数据**
- //whereDel := map[string]interface{}{"_id": id}
- //MgoQy.Delete("qyxy_std", whereDel)
- //MgoQy.SaveByOriID("wcc_qyxy_std_delete", tmp)
- // **清理滚动查询**
- if scrollID != "" {
- clearReq := map[string]interface{}{"scroll_id": []string{scrollID}}
- clearBody, _ := json.Marshal(clearReq)
- res, err := es.ClearScroll(es.ClearScroll.WithBody(bytes.NewReader(clearBody)))
- if err != nil {
- log.Println("清理滚动搜索失败:", err)
- } else {
- defer res.Body.Close()
- //log.Println("滚动查询清理成功")
- }
- }
- }
- log.Println("数据处理完毕")
- }
|