123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/gogf/gf/v2/util/gconv"
- "github.com/olivere/elastic/v7"
- "github.com/robfig/cron"
- "io"
- "log"
- "sync"
- )
- func init() {
- ReadConfig(&Config) //初始化
- InitMgo() //mgo
- InitCkh() //clickhouse
- InitEs() //es
- InitOther()
- }
- func main() {
- c := cron.New()
- //增量
- c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
- c.Start()
- //历史
- //HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
- //HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
- //HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
- //临时处理(信息补充)
- //HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
- //IncTransactionDataMgoToCkhAndEs() //数据迁移
- ch := make(chan bool)
- <-ch
- }
- func tmp() {
- sess := MgoPro.GetMgoConn()
- defer MgoPro.DestoryMongoConn(sess)
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- query := map[string]interface{}{
- //"project_bidstatus": 4,
- //"_id": map[string]interface{}{
- // "$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"),
- //},
- //"update_time": map[string]interface{}{
- // "$lt": 1714959573,
- //},
- //"_id": mongodb.StringTOBsonId("6630eae76f6c86a3962f3a07"),
- //"repeat": true,
- "update_time": map[string]interface{}{
- "$gte": 1714959573,
- "$lte": 1719795791,
- },
- }
- repeat := map[string]bool{}
- count := MgoPro.Count("projectset_wy_back", query)
- fmt.Println("count:", count)
- it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
- n := 0
- arr := [][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- update := []map[string]interface{}{}
- project_id := gconv.String(tmp["project_id"])
- //lock.Lock()
- //if !repeat[project_id] {
- //Es.DelById(Config.Es.Index, project_id)
- //CkhTool.Exec(context.Background(), "ALTER TABLE information.transaction_info_copy DELETE WHERE project_id = ?", project_id)
- //repeat[project_id] = true
- //}
- //lock.Unlock()
- //err, result := Es.GetById(Config.Es.Index, project_id)
- //Es.DelById()
- //if err != nil || len(result) == 0 {
- // fmt.Println(project_id)
- // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"es": false}})
- //} else {
- // if gconv.Int(result["project_bidstatus"]) != 0 {
- // fmt.Println("11", project_id)
- // }
- //}
- tt := map[string]bool{}
- err, result := Es.GetById(Config.Es.Index, gconv.String(tmp["_id"]))
- if err != nil || len(result) == 0 {
- tt["es"] = true
- }
- if FindClickHouseByProjectId(project_id) == 0 {
- tt["click"] = true
- }
- if len(tt) > 0 {
- update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": tt}})
- }
- //if MgoPro.Count("projectset_wy_back", map[string]interface{}{"project_id": tmp["project_id"]}) > 1 {
- // fmt.Println("project_id")
- // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": "project"}})
- //} else if MgoB.Count("projectset_wy", map[string]interface{}{"project_id": tmp["project_id"]}) > 0 {
- // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": "bidding"}})
- //}
- if len(update) > 0 {
- lock.Lock()
- arr = append(arr, update)
- if len(arr) > 500 {
- MgoPro.UpdateBulk("projectset_wy_back", arr...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }
- }(tmp)
- if n%1000 == 0 {
- fmt.Println("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- if len(arr) > 0 {
- MgoPro.UpdateBulk("projectset_wy_back", arr...)
- arr = [][]map[string]interface{}{}
- }
- fmt.Println("迁移结束...", len(repeat))
- }
- func getBiddingData() {
- url := "http://172.17.4.184:19908"
- //url := "http://127.0.0.1:19908"
- //url := "http://192.168.3.149:9200"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- index := "transaction_info" //索引名称
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- //rangeQuery := elastic.NewRangeQuery("project_bidstatus").Lt(3)
- query := elastic.NewBoolQuery().
- Must(elastic.NewTermQuery("id", "64d7146cb44bf08751e3c133")) //
- //Must(rangeQuery)
- //Must(elastic.NewTermQuery("subtype", "招标"))
- //query := elastic.NewBoolQuery().
- // //北京,天津,河北,上海,江苏,浙江,安徽
- // //Must(elastic.NewTermQuery("area", "北京市")).
- // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
- // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
- // Must(rangeQuery)
- ctx := context.Background()
- //开始滚动搜索
- scrollID := ""
- scroll := "10m"
- searchSource := elastic.NewSearchSource().
- Query(query).
- Size(500)
- //Sort("_doc", true) //升序排序
- //Sort("_doc", false) //降序排序
- searchService := client.Scroll(index).
- Size(500).
- Scroll(scroll).
- SearchSource(searchSource)
- res, err := searchService.Do(ctx)
- if err != nil {
- if err == io.EOF {
- fmt.Println("没有数据")
- } else {
- panic(err)
- }
- }
- //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
- fmt.Println("总数是:", res.TotalHits())
- total := 0
- for len(res.Hits.Hits) > 0 {
- for _, hit := range res.Hits.Hits {
- var doc map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- set := map[string]interface{}{}
- id := gconv.String(doc["id"])
- //情报
- information_id := gconv.String(doc["information_id"])
- info_ids := gconv.Strings(doc["info_ids"])
- info := FindInfomationData(info_ids...)
- if information_id != info.Id {
- doc["information_id"] = info.Id
- doc["starttime"] = info.Starttime
- doc["endtime"] = info.Endtime
- set["information_id"] = info.Id
- set["starttime"] = info.Starttime
- set["endtime"] = info.Endtime
- }
- //法人
- buyer, agency := "", ""
- winners := []string{}
- if gconv.String(doc["buyer_id"]) == "" {
- buyer = gconv.String(doc["buyer"])
- }
- if gconv.String(doc["agency_id"]) == "" {
- agency = gconv.String(doc["agency"])
- }
- if len(gconv.Strings(doc["winner_id"])) == 0 {
- if winnersTmp := gconv.Strings(doc["winner"]); len(winnersTmp) > 0 {
- winners = winnersTmp
- }
- }
- buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
- if buyer_id != "" {
- doc["buyer_id"] = buyer_id
- set["buyer_id"] = buyer_id
- }
- if agency_id != "" {
- doc["agency_id"] = agency_id
- set["agency_id"] = agency_id
- }
- if len(winner_ids) > 0 {
- doc["winner_id"] = winner_ids
- set["winner_id"] = winner_ids
- }
- //
- coll := "projectset_wy_back"
- business_type := gconv.String(doc["business_type"])
- if business_type == "新增项目" {
- coll = "projectset_wy"
- doc["business_type"] = "新增物业项目"
- set["business_type"] = "新增物业项目"
- } else if business_type == "采购意向" {
- coll = "projectset_wy"
- }
- fmt.Println(set)
- if len(set) > 0 {
- //更新es
- client.Update().Index(index).Id(id).Doc(doc).Do(context.Background())
- //更新clickhouse
- err := UpdateDataToClickHouse(set, map[string]interface{}{"project_id": id})
- fmt.Println("11", err)
- //更新mgo
- MgoPro.Update(coll, map[string]interface{}{"project_id": id}, map[string]interface{}{"$set": set}, false, false)
- }
- }
- total = total + len(res.Hits.Hits)
- scrollID = res.ScrollId
- res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
- log.Println("current count:", total)
- if err != nil {
- if err == io.EOF {
- // 滚动到最后一批数据,退出循环
- break
- }
- log.Println("滚动搜索失败:", err, res)
- break // 处理错误时退出循环
- }
- }
- // 在循环外调用 ClearScroll
- _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
- if err != nil {
- log.Printf("清理滚动搜索失败:%s", err)
- }
- fmt.Println("结束~~~~~~~~~~~~~~~")
- }
|