123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/olivere/elastic/v7"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- )
- // updateHrefByEs 更新mgo by es
- func updateHrefByEs() {
- //url := "http://172.17.4.184:19908"
- //url := "http://127.0.0.1:19908"
- url := "http://127.0.0.1:19905"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- //index := "bidding" //索引名称
- index := "biddingall" //索引名称
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- MgoB := &mongodb.MongodbSim{
- //MongodbAddr: "172.17.189.140:27080",
- MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "qfw",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- Direct: true,
- }
- MgoB.InitPool()
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "itype": 0,
- }
- it := sess.DB("qfw").C("bidding_es_update_id").Find(where).Select(nil).Iter()
- log.Println("taskRun 开始")
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%100 == 0 {
- log.Println("current:", count)
- }
- id := mongodb.BsonIdToSId(tmp["_id"])
- res, _ := GetByID(client, index, id)
- if res == nil {
- //没有找到
- update := map[string]interface{}{
- "itype": 0,
- }
- MgoB.UpdateById("bidding_es_update_id", id, map[string]interface{}{"$set": update})
- } else {
- // 找到对应数据了
- update := map[string]interface{}{
- "href": res["href"],
- "itype": 1,
- }
- update2 := map[string]interface{}{
- "href": res["href"],
- }
- MgoB.UpdateById("bidding_es_update_id", id, map[string]interface{}{"$set": update})
- MgoB.UpdateById("bidding", id, map[string]interface{}{"$set": update2})
- }
- }
- log.Println("over", count)
- }
- // GetByID 根据索引和 ID 获取数据
- func GetByID(ec *elastic.Client, index string, id string) (map[string]interface{}, error) {
- ctx := context.Background()
- // 执行 Get 请求
- res, err := ec.Get().
- Index(index).
- Id(id).
- Do(ctx)
- if err != nil {
- if elastic.IsNotFound(err) {
- return nil, fmt.Errorf("document not found for index '%s' and id '%s'", index, id)
- }
- return nil, fmt.Errorf("failed to get document: %w", err)
- }
- // 解析文档源
- if !res.Found {
- return nil, fmt.Errorf("document not found for index '%s' and id '%s'", index, id)
- }
- // 解析 JSON 数据到 map[string]interface{}
- var source map[string]interface{}
- if err := json.Unmarshal(res.Source, &source); err != nil {
- return nil, fmt.Errorf("failed to unmarshal document source: %w", err)
- }
- return source, nil
- }
|