package main import ( "context" "fmt" "github.com/olivere/elastic/v7" "go.mongodb.org/mongo-driver/bson" "io" "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "strings" "sync" "testing" ) func TestFindId(T *testing.T) { Mgo := &mongodb.MongodbSim{ MongodbAddr: "127.0.0.1:27083", DbName: "qfw", Size: 10, Direct: true, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", } Mgo.InitPool() start := -1 end := 0 st := util.GetDayStartSecond(start) // et := util.GetDayStartSecond(end) // startID := fmt.Sprintf("%x0000000000000000", st) //开始ID endID := fmt.Sprintf("%x0000000000000000", et) // 结束ID urla := "http://127.0.0.1:19805" usernamea := "es_all" passworda := "TopJkO2E_d1x" //创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(urla), elastic.SetBasicAuth(usernamea, passworda), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } id1 := mongodb.StringTOBsonId(startID) id2 := mongodb.StringTOBsonId(endID) mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段 sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) var existsMap sync.Map fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1} query := sess.DB("qfw").C("bidding").Find(mq).Select(fd).Iter() count := 0 ch := make(chan bool, 15) wg := &sync.WaitGroup{} //var ids = make([]string, 0) for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count) } if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() id := mongodb.BsonIdToSId(tmp["_id"]) exist, _ := documentExists(client, "bidding", id) if !exist { existsMap.Store(id, id) } }(tmp) tmp = map[string]interface{}{} } } wg.Wait() existsMap.Range(func(key, _ interface{}) bool { fmt.Println(key) return true }) log.Println("over") } // getElasticsearchIDs 获取 Elasticsearch 中的 ID 列表 func getElasticsearchIDs(client *elastic.Client, indexName string, startID, endID string) ([]string, error) { query := elastic.NewRangeQuery("id").Gte(startID).Lte(endID) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "1m" searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", false) //降序排序 searchService := client.Scroll(indexName). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) var ids []string total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { ids = append(ids, hit.Id) } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) fmt.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Printf("滚动搜索失败:%s", err) break // 处理错误时退出循环 } } fmt.Println("结束~~~~~~~~~~~~~~~") return ids, nil } // convertToObjectId 将 Elasticsearch 的 ID 转换为 MongoDB 的 ObjectId func convertToObjectId(ids []string) []interface{} { var objectIDs []interface{} for _, id := range ids { objectIDs = append(objectIDs, strings.TrimSpace(id)) } return objectIDs } // documentExists 检查指定 ID 是否存在于 Elasticsearch 中 //func documentExists(client *elastic.Client, indexName, documentID string) (bool, error) { // exists, err := client.Exists(). // Index(indexName). // Id(documentID). // Do(context.Background()) // if err != nil { // return false, err // } // // return exists, nil //}