package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "sync" ) // getForecast 获取测试环境数据 func getForecast() { url := "http://192.168.3.149:9201" //url := "http://127.0.0.1:19805" username := "" password := "" index := "forecast" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } //测试环境 Mgo := &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27080", MongodbAddr: "192.168.3.206:27002", DbName: "qfw_data", Size: 10, UserName: "root", Password: "root", //Direct: true, } Mgo.InitPool() MgoB := &mongodb.MongodbSim{ //MongodbAddr: "172.17.189.140:27080", MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", Direct: true, } MgoB.InitPool() //2023年01-01 2023-10-01,,1-3季度 //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省") //rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1696089600).Lt(1704038400) //query := elastic.NewBoolQuery(). // Must(areaTermsQuery). // Must(rangeQuery) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). Query(nil). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } //infoid := util.ObjToString(doc["infoid"]) // //bidding, _ := MgoB.FindById("bidding", infoid, nil) ////存入新表 //rs := Mgo.SaveByOriID("bidding", bidding) //if !rs { // log.Println("保存出错", infoid) //} Mgo.Save("wcc_forecast", doc) } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") } func dealData() { Mgo := &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27080", MongodbAddr: "192.168.3.206:27002", DbName: "qfw_data", Size: 10, UserName: "root", Password: "root", //Direct: true, } Mgo.InitPool() // 163 正式环境 MgoB := &mongodb.MongodbSim{ //MongodbAddr: "172.17.189.140:27080", MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", Direct: true, } MgoB.InitPool() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) query := sess.DB("qfw_data").C("wcc_forecast").Find(nil).Select(map[string]interface{}{"infoid": 1}).Iter() count := 0 ch := make(chan bool, 10) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current:", count) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() infoid := util.ObjToString(tmp["infoid"]) bidding, _ := MgoB.FindById("bidding", infoid, nil) //存入新表 rs := Mgo.SaveByOriID("bidding", bidding) if !rs { log.Println("保存出错", infoid) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("over") }