package main import ( "context" "encoding/json" "fmt" "github.com/gogf/gf/v2/util/gconv" "github.com/olivere/elastic/v7" "github.com/robfig/cron" "io" "log" "sync" ) func init() { ReadConfig(&Config) //初始化 InitMgo() //mgo InitCkh() //clickhouse InitEs() //es InitOther() } func main() { c := cron.New() //增量 c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据 c.Start() //历史 //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息 //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息 //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息 //临时处理(信息补充) //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form //IncTransactionDataMgoToCkhAndEs() //数据迁移 ch := make(chan bool) <-ch } func tmp() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, 1) wg := &sync.WaitGroup{} //lock := &sync.Mutex{} query := map[string]interface{}{ //"project_bidstatus": 4, //"_id": map[string]interface{}{ // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"), //}, //"update_time": map[string]interface{}{ // "$lt": 1714959573, //}, //"_id": mongodb.StringTOBsonId("6630eae76f6c86a3962f3a07"), "repeat": true, } repeat := map[string]bool{} count := MgoPro.Count("projectset_wy_back", query) fmt.Println("count:", count) it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter() n := 0 //arr := []map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() //update := []map[string]interface{}{} //project_id := gconv.String(tmp["project_id"]) //lock.Lock() //if !repeat[project_id] { // Es.DelById(Config.Es.Index, project_id) // CkhTool.Exec(context.Background(), "ALTER TABLE information.transaction_info_copy DELETE WHERE project_id = ?", project_id) // repeat[project_id] = true //} //lock.Unlock() //err, result := Es.GetById(Config.Es.Index, project_id) //Es.DelById() //if err != nil || len(result) == 0 { // fmt.Println(project_id) // update = append(update, map[string]interface{}{"_id": tmp["_id"]}) // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"es": false}}) //} else { // if gconv.Int(result["project_bidstatus"]) != 0 { // fmt.Println("11", project_id) // } //} if MgoPro.Count("projectset_wy_back", map[string]interface{}{"project_id": tmp["project_id"]}) > 1 { fmt.Println("project_id") //update = append(update, map[string]interface{}{"_id": tmp["_id"]}) //update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": true}}) } //if len(update) > 0 { // lock.Lock() // arr = append(arr, update) // if len(arr) > 500 { // MgoPro.UpdateBulk("projectset_wy_back", arr...) // arr = [][]map[string]interface{}{} // } // lock.Unlock() //} }(tmp) if n%1000 == 0 { fmt.Println("current:", n) } tmp = map[string]interface{}{} } wg.Wait() //if len(arr) > 0 { // MgoPro.SaveBulk("projectset_wy_tmp2", arr...) // arr = []map[string]interface{}{} //} fmt.Println("迁移结束...", len(repeat)) } func getBiddingData() { url := "http://172.17.4.184:19908" //url := "http://192.168.3.149:9200" username := "jybid" password := "Top2023_JEB01i@31" index := "transaction_info" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } //rangeQuery := elastic.NewRangeQuery("project_bidstatus").Lt(3) //query := elastic.NewBoolQuery(). // Must(elastic.NewTermQuery("id", "652d2f432ccb08936fe2ed60")) // //Must(rangeQuery) //Must(elastic.NewTermQuery("subtype", "招标")) //query := elastic.NewBoolQuery(). // //北京,天津,河北,上海,江苏,浙江,安徽 // //Must(elastic.NewTermQuery("area", "北京市")). // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")). // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")). // Must(rangeQuery) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). //Query(query). Size(500) //Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(500). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } id := gconv.String(doc["id"]) //情报 info_ids := gconv.Strings(doc["info_ids"]) info := FindInfomationData(info_ids...) doc["information_id"] = info.Id doc["starttime"] = info.Starttime doc["endtime"] = info.Endtime set := map[string]interface{}{ "information_id": info.Id, "starttime": info.Starttime, "endtime": info.Endtime, } //法人 buyer, agency := "", "" winners := []string{} if gconv.String(doc["buyer_id"]) == "" { buyer = gconv.String(doc["buyer"]) } if gconv.String(doc["agency_id"]) == "" { agency = gconv.String(doc["agency"]) } if len(gconv.Strings(doc["winner_id"])) == 0 { if winnersTmp := gconv.Strings(doc["winner"]); len(winnersTmp) > 0 { winners = winnersTmp } } buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners) if buyer_id != "" { doc["buyer_id"] = buyer_id set["buyer_id"] = buyer_id } if agency_id != "" { doc["agency_id"] = agency_id set["agency_id"] = agency_id } if len(winner_ids) > 0 { doc["winner_id"] = winner_ids set["winner_id"] = winner_ids } // coll := "projectset_wy_back" business_type := gconv.String(doc["business_type"]) if business_type == "新增项目" { coll = "projectset_wy" doc["business_type"] = "新增物业项目" set["business_type"] = "新增物业项目" } else if business_type == "采购意向" { coll = "projectset_wy" } //更新es client.Update().Index(index).Id(id).Doc(doc).Do(context.Background()) //更新clickhouse UpdateDataToClickHouse(set, map[string]interface{}{"project_id": id}) //更新mgo MgoPro.Update(coll, map[string]interface{}{"project_id": id}, map[string]interface{}{"$set": set}, false, false) } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") }