bidding.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "log"
  10. )
  11. // updateHrefByEs 更新mgo by es
  12. func updateHrefByEs() {
  13. //url := "http://172.17.4.184:19908"
  14. //url := "http://127.0.0.1:19908"
  15. url := "http://127.0.0.1:19905"
  16. username := "jybid"
  17. password := "Top2023_JEB01i@31"
  18. //index := "bidding" //索引名称
  19. index := "biddingall" //索引名称
  20. // 创建 Elasticsearch 客户端
  21. client, err := elastic.NewClient(
  22. elastic.SetURL(url),
  23. elastic.SetBasicAuth(username, password),
  24. elastic.SetSniff(false),
  25. )
  26. if err != nil {
  27. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  28. }
  29. MgoB := &mongodb.MongodbSim{
  30. //MongodbAddr: "172.17.189.140:27080",
  31. MongodbAddr: "127.0.0.1:27083",
  32. Size: 10,
  33. DbName: "qfw",
  34. UserName: "SJZY_RWbid_ES",
  35. Password: "SJZY@B4i4D5e6S",
  36. Direct: true,
  37. }
  38. MgoB.InitPool()
  39. defer util.Catch()
  40. sess := MgoB.GetMgoConn()
  41. defer MgoB.DestoryMongoConn(sess)
  42. where := map[string]interface{}{
  43. "itype": 0,
  44. }
  45. it := sess.DB("qfw").C("bidding_es_update_id").Find(where).Select(nil).Iter()
  46. log.Println("taskRun 开始")
  47. count := 0
  48. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  49. if count%100 == 0 {
  50. log.Println("current:", count)
  51. }
  52. id := mongodb.BsonIdToSId(tmp["_id"])
  53. res, _ := GetByID(client, index, id)
  54. if res == nil {
  55. //没有找到
  56. update := map[string]interface{}{
  57. "itype": 0,
  58. }
  59. MgoB.UpdateById("bidding_es_update_id", id, map[string]interface{}{"$set": update})
  60. } else {
  61. // 找到对应数据了
  62. update := map[string]interface{}{
  63. "href": res["href"],
  64. "itype": 1,
  65. }
  66. update2 := map[string]interface{}{
  67. "href": res["href"],
  68. }
  69. MgoB.UpdateById("bidding_es_update_id", id, map[string]interface{}{"$set": update})
  70. MgoB.UpdateById("bidding", id, map[string]interface{}{"$set": update2})
  71. }
  72. }
  73. log.Println("over", count)
  74. }
  75. // GetByID 根据索引和 ID 获取数据
  76. func GetByID(ec *elastic.Client, index string, id string) (map[string]interface{}, error) {
  77. ctx := context.Background()
  78. // 执行 Get 请求
  79. res, err := ec.Get().
  80. Index(index).
  81. Id(id).
  82. Do(ctx)
  83. if err != nil {
  84. if elastic.IsNotFound(err) {
  85. return nil, fmt.Errorf("document not found for index '%s' and id '%s'", index, id)
  86. }
  87. return nil, fmt.Errorf("failed to get document: %w", err)
  88. }
  89. // 解析文档源
  90. if !res.Found {
  91. return nil, fmt.Errorf("document not found for index '%s' and id '%s'", index, id)
  92. }
  93. // 解析 JSON 数据到 map[string]interface{}
  94. var source map[string]interface{}
  95. if err := json.Unmarshal(res.Source, &source); err != nil {
  96. return nil, fmt.Errorf("failed to unmarshal document source: %w", err)
  97. }
  98. return source, nil
  99. }