id_test.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/olivere/elastic/v7"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "io"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "strings"
  12. "sync"
  13. "testing"
  14. )
  15. func TestFindId(T *testing.T) {
  16. Mgo := &mongodb.MongodbSim{
  17. MongodbAddr: "127.0.0.1:27083",
  18. DbName: "qfw",
  19. Size: 10,
  20. Direct: true,
  21. UserName: "SJZY_RWbid_ES",
  22. Password: "SJZY@B4i4D5e6S",
  23. }
  24. Mgo.InitPool()
  25. start := -1
  26. end := 0
  27. st := util.GetDayStartSecond(start) //
  28. et := util.GetDayStartSecond(end) //
  29. startID := fmt.Sprintf("%x0000000000000000", st) //开始ID
  30. endID := fmt.Sprintf("%x0000000000000000", et) // 结束ID
  31. urla := "http://127.0.0.1:19805"
  32. usernamea := "es_all"
  33. passworda := "TopJkO2E_d1x"
  34. //创建 Elasticsearch 客户端
  35. client, err := elastic.NewClient(
  36. elastic.SetURL(urla),
  37. elastic.SetBasicAuth(usernamea, passworda),
  38. elastic.SetSniff(false),
  39. )
  40. if err != nil {
  41. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  42. }
  43. id1 := mongodb.StringTOBsonId(startID)
  44. id2 := mongodb.StringTOBsonId(endID)
  45. mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
  46. sess := Mgo.GetMgoConn()
  47. defer Mgo.DestoryMongoConn(sess)
  48. var existsMap sync.Map
  49. fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
  50. query := sess.DB("qfw").C("bidding").Find(mq).Select(fd).Iter()
  51. count := 0
  52. ch := make(chan bool, 15)
  53. wg := &sync.WaitGroup{}
  54. //var ids = make([]string, 0)
  55. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  56. if count%10000 == 0 {
  57. log.Println("current:", count)
  58. }
  59. if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
  60. ch <- true
  61. wg.Add(1)
  62. go func(tmp map[string]interface{}) {
  63. defer func() {
  64. <-ch
  65. wg.Done()
  66. }()
  67. id := mongodb.BsonIdToSId(tmp["_id"])
  68. exist, _ := documentExists(client, "bidding", id)
  69. if !exist {
  70. existsMap.Store(id, id)
  71. }
  72. }(tmp)
  73. tmp = map[string]interface{}{}
  74. }
  75. }
  76. wg.Wait()
  77. existsMap.Range(func(key, _ interface{}) bool {
  78. fmt.Println(key)
  79. return true
  80. })
  81. log.Println("over")
  82. }
  83. // getElasticsearchIDs 获取 Elasticsearch 中的 ID 列表
  84. func getElasticsearchIDs(client *elastic.Client, indexName string, startID, endID string) ([]string, error) {
  85. query := elastic.NewRangeQuery("id").Gte(startID).Lte(endID)
  86. ctx := context.Background()
  87. //开始滚动搜索
  88. scrollID := ""
  89. scroll := "1m"
  90. searchSource := elastic.NewSearchSource().
  91. Query(query).
  92. Size(10000).
  93. Sort("_doc", false) //降序排序
  94. searchService := client.Scroll(indexName).
  95. Size(10000).
  96. Scroll(scroll).
  97. SearchSource(searchSource)
  98. res, err := searchService.Do(ctx)
  99. if err != nil {
  100. if err == io.EOF {
  101. fmt.Println("没有数据")
  102. } else {
  103. panic(err)
  104. }
  105. }
  106. defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  107. fmt.Println("总数是:", res.TotalHits())
  108. var ids []string
  109. total := 0
  110. for len(res.Hits.Hits) > 0 {
  111. for _, hit := range res.Hits.Hits {
  112. ids = append(ids, hit.Id)
  113. }
  114. total = total + len(res.Hits.Hits)
  115. scrollID = res.ScrollId
  116. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  117. fmt.Println("current count:", total)
  118. if err != nil {
  119. if err == io.EOF {
  120. // 滚动到最后一批数据,退出循环
  121. break
  122. }
  123. log.Printf("滚动搜索失败:%s", err)
  124. break // 处理错误时退出循环
  125. }
  126. }
  127. fmt.Println("结束~~~~~~~~~~~~~~~")
  128. return ids, nil
  129. }
  130. // convertToObjectId 将 Elasticsearch 的 ID 转换为 MongoDB 的 ObjectId
  131. func convertToObjectId(ids []string) []interface{} {
  132. var objectIDs []interface{}
  133. for _, id := range ids {
  134. objectIDs = append(objectIDs, strings.TrimSpace(id))
  135. }
  136. return objectIDs
  137. }
  138. // documentExists 检查指定 ID 是否存在于 Elasticsearch 中
  139. //func documentExists(client *elastic.Client, indexName, documentID string) (bool, error) {
  140. // exists, err := client.Exists().
  141. // Index(indexName).
  142. // Id(documentID).
  143. // Do(context.Background())
  144. // if err != nil {
  145. // return false, err
  146. // }
  147. //
  148. // return exists, nil
  149. //}