123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- package main
- import (
- "context"
- "fmt"
- "github.com/olivere/elastic/v7"
- "go.mongodb.org/mongo-driver/bson"
- "io"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "strings"
- "sync"
- "testing"
- )
- func TestFindId(T *testing.T) {
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "127.0.0.1:27083",
- DbName: "qfw",
- Size: 10,
- Direct: true,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- }
- Mgo.InitPool()
- start := -1
- end := 0
- st := util.GetDayStartSecond(start) //
- et := util.GetDayStartSecond(end) //
- startID := fmt.Sprintf("%x0000000000000000", st) //开始ID
- endID := fmt.Sprintf("%x0000000000000000", et) // 结束ID
- urla := "http://127.0.0.1:19805"
- usernamea := "es_all"
- passworda := "TopJkO2E_d1x"
- //创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(urla),
- elastic.SetBasicAuth(usernamea, passworda),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- id1 := mongodb.StringTOBsonId(startID)
- id2 := mongodb.StringTOBsonId(endID)
- mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- var existsMap sync.Map
- fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
- query := sess.DB("qfw").C("bidding").Find(mq).Select(fd).Iter()
- count := 0
- ch := make(chan bool, 15)
- wg := &sync.WaitGroup{}
- //var ids = make([]string, 0)
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count)
- }
- if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- id := mongodb.BsonIdToSId(tmp["_id"])
- exist, _ := documentExists(client, "bidding", id)
- if !exist {
- existsMap.Store(id, id)
- }
- }(tmp)
- tmp = map[string]interface{}{}
- }
- }
- wg.Wait()
- existsMap.Range(func(key, _ interface{}) bool {
- fmt.Println(key)
- return true
- })
- log.Println("over")
- }
- // getElasticsearchIDs 获取 Elasticsearch 中的 ID 列表
- func getElasticsearchIDs(client *elastic.Client, indexName string, startID, endID string) ([]string, error) {
- query := elastic.NewRangeQuery("id").Gte(startID).Lte(endID)
- ctx := context.Background()
- //开始滚动搜索
- scrollID := ""
- scroll := "1m"
- searchSource := elastic.NewSearchSource().
- Query(query).
- Size(10000).
- Sort("_doc", false) //降序排序
- searchService := client.Scroll(indexName).
- Size(10000).
- Scroll(scroll).
- SearchSource(searchSource)
- res, err := searchService.Do(ctx)
- if err != nil {
- if err == io.EOF {
- fmt.Println("没有数据")
- } else {
- panic(err)
- }
- }
- defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
- fmt.Println("总数是:", res.TotalHits())
- var ids []string
- total := 0
- for len(res.Hits.Hits) > 0 {
- for _, hit := range res.Hits.Hits {
- ids = append(ids, hit.Id)
- }
- total = total + len(res.Hits.Hits)
- scrollID = res.ScrollId
- res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
- fmt.Println("current count:", total)
- if err != nil {
- if err == io.EOF {
- // 滚动到最后一批数据,退出循环
- break
- }
- log.Printf("滚动搜索失败:%s", err)
- break // 处理错误时退出循环
- }
- }
- fmt.Println("结束~~~~~~~~~~~~~~~")
- return ids, nil
- }
- // convertToObjectId 将 Elasticsearch 的 ID 转换为 MongoDB 的 ObjectId
- func convertToObjectId(ids []string) []interface{} {
- var objectIDs []interface{}
- for _, id := range ids {
- objectIDs = append(objectIDs, strings.TrimSpace(id))
- }
- return objectIDs
- }
- // documentExists 检查指定 ID 是否存在于 Elasticsearch 中
- //func documentExists(client *elastic.Client, indexName, documentID string) (bool, error) {
- // exists, err := client.Exists().
- // Index(indexName).
- // Id(documentID).
- // Do(context.Background())
- // if err != nil {
- // return false, err
- // }
- //
- // return exists, nil
- //}
|