123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- package main
- import (
- "fmt"
- "github.com/gogf/gf/v2/util/gconv"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "strings"
- "sync"
- )
- // HisTransactionDataFromBid 历史bidding(指定截止comeintime,采购意向)
- func HisTransactionDataFromBid() {
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- ch := make(chan bool, 10)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- query := map[string]interface{}{
- "toptype": "采购意向",
- }
- fields := map[string]interface{}{
- "projectname": 1,
- "budget": 1,
- "bidamount": 1,
- "buyer": 1,
- "s_winner": 1,
- "agency": 1,
- "property_form": 1,
- "multipackage": 1,
- "area": 1,
- "city": 1,
- "district": 1,
- //
- "publishtime": 1,
- "comeintime": 1,
- "extracttype": 1,
- "tag_subinformation": 1,
- "tag_subinformation_ai": 1,
- "tag_topinformation": 1,
- "tag_topinformation_ai": 1,
- }
- arr := []map[string]interface{}{}
- it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if gconv.Int64(tmp["comeintime"]) >= 1713196800 { //截止时间1713196800
- return
- }
- if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
- return
- }
- if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
- return
- }
- result := DealTransactionForBid(tmp, "采购意向", 3)
- lock.Lock()
- if len(result) > 0 {
- arr = append(arr, result)
- }
- if len(arr) > 50 {
- MgoPro.SaveBulk("projectset_wy", arr...)
- arr = []map[string]interface{}{}
- }
- lock.Unlock()
- }(tmp)
- if n%10000 == 0 {
- fmt.Println("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- if len(arr) > 0 {
- MgoPro.SaveBulk("projectset_wy", arr...)
- arr = []map[string]interface{}{}
- }
- fmt.Println("结束")
- }
- // HisTransactionDataFromBid2 历史bidding(指定截止comeintime,新增物业项目)
- func HisTransactionDataFromBid2() {
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- ch := make(chan bool, 20)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": 1713715200,
- "$lt": 1713801600,
- },
- "toptype": "拟建",
- }
- fields := map[string]interface{}{
- "projectname": 1,
- "budget": 1,
- "bidamount": 1,
- "buyer": 1,
- "s_winner": 1,
- "agency": 1,
- "property_form": 1,
- "multipackage": 1,
- "area": 1,
- "city": 1,
- "district": 1,
- //
- "owner": 1,
- "s_topscopeclass": 1,
- "publishtime": 1,
- "toptype": 1,
- "comeintime": 1,
- "extracttype": 1,
- "tag_subinformation": 1,
- "tag_subinformation_ai": 1,
- "tag_topinformation": 1,
- "tag_topinformation_ai": 1,
- }
- arr := []map[string]interface{}{}
- it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- //comeintime := gconv.Int64(tmp["comeintime"])
- //if comeintime < 1609430400 || comeintime >= 1713715200 {
- // return
- //}
- if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
- return
- }
- if s_topscopeclass := gconv.String(tmp["s_topscopeclass"]); !strings.Contains(s_topscopeclass, "建筑工程") { //排除非建筑工程
- return
- }
- if tag_topinformation := gconv.String(tmp["tag_topinformation"]); strings.Contains(tag_topinformation, "物业") { //排除物业
- return
- } else if tag_topinformation_ai := gconv.String(tmp["tag_topinformation_ai"]); strings.Contains(tag_topinformation_ai, "物业") {
- return
- }
- //if tmp["tag_topinformation"] != nil || tmp["tag_topinformation_ai"] != nil { //不包含物业
- // return
- //}
- project_bidstatus := 4 //拟建
- business_type := "新增物业项目"
- result := DealTransactionForBid(tmp, business_type, project_bidstatus)
- lock.Lock()
- if len(result) > 0 {
- arr = append(arr, result)
- }
- if len(arr) > 50 {
- MgoPro.SaveBulk("projectset_wy_nj", arr...)
- arr = []map[string]interface{}{}
- }
- lock.Unlock()
- }(tmp)
- if n%10000 == 0 {
- fmt.Println("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- if len(arr) > 0 {
- MgoPro.SaveBulk("projectset_wy_nj", arr...)
- arr = []map[string]interface{}{}
- }
- fmt.Println("结束")
- }
- // HisTransactionDataFromProject 历史project(指定截止pici:1713196800)
- func HisTransactionDataFromProject() {
- sess := MgoPro.GetMgoConn()
- defer MgoPro.DestoryMongoConn(sess)
- ch := make(chan bool, 20)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- query := map[string]interface{}{
- "pici": map[string]interface{}{
- "$lt": 1713196800,
- //"$gt": 1711900800,
- },
- }
- fields := map[string]interface{}{
- "projectname": 1,
- "budget": 1,
- "bidamount": 1,
- "buyer": 1,
- "s_winner": 1,
- "agency": 1,
- "property_form": 1,
- "multipackage": 1,
- "area": 1,
- "city": 1,
- "district": 1,
- "zbtime": 1,
- "jgtime": 1,
- "bidstatus": 1,
- //
- "firsttime": 1,
- "ids": 1,
- "pici": 1,
- "sourceinfoid": 1,
- "tag_subinformation": 1,
- "tag_subinformation_ai": 1,
- "tag_topinformation": 1,
- "tag_topinformation_ai": 1,
- }
- arr := []map[string]interface{}{}
- it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
- return
- }
- result := DealTransactionForPro(tmp)
- lock.Lock()
- if len(result) > 0 {
- arr = append(arr, result)
- }
- if len(arr) > 50 {
- MgoPro.SaveBulk("projectset_wy_newback", arr...)
- arr = []map[string]interface{}{}
- }
- lock.Unlock()
- }(tmp)
- if n%10000 == 0 {
- fmt.Println("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- if len(arr) > 0 {
- MgoPro.SaveBulk("projectset_wy_newback", arr...)
- arr = []map[string]interface{}{}
- }
- fmt.Println("结束")
- }
- // HisTransactionDataAddInformation 补充字段信息
- func HisTransactionDataAddInformation() {
- sess := MgoPro.GetMgoConn()
- defer MgoPro.DestoryMongoConn(sess)
- ch := make(chan bool, 1)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- query := map[string]interface{}{
- //"_id": mongodb.StringTOBsonId("662f01d8397fa006e2e75e6c"),
- //项目
- //"_id": map[string]interface{}{
- // "$gte": mongodb.StringTOBsonId("66308fa06f6c86a3960ae83f"),
- // "$lte": mongodb.StringTOBsonId("66308feb6f6c86a3960b0f4e"),
- //},
- //拟建
- //"project_bidstatus": 4,
- //"_id": map[string]interface{}{
- // "$lte": mongodb.StringTOBsonId("6627227819c5408c474c3802"),
- //},
- //采购意向
- //"project_bidstatus": 3,
- //"_id": map[string]interface{}{
- // "$lte": mongodb.StringTOBsonId("661f798b5a4e6cc01349dad0"),
- //},
- //历史projectset_wy
- //"project_id": map[string]interface{}{
- // //"$gt": "662143800000000000000000",
- // "$gt": "667c3b5166cf0db42ae965e6",
- //},
- "project_id": "6637ae0866cf0db42aeeb5d4",
- //历史projectset_wy_back
- //"update_time": map[string]interface{}{
- // "$gte": 1714959573,
- // "$lte": 1719795791,
- //},
- }
- count := MgoPro.Count("projectset_wy_back", query)
- fmt.Println("count:", count)
- it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
- n := 0
- arr := [][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- id := mongodb.BsonIdToSId(tmp["_id"])
- update := []map[string]interface{}{
- {"_id": mongodb.StringTOBsonId(id)},
- }
- set := map[string]interface{}{}
- //法人信息
- buyer_id, agency_id, winner_ids := FindEntInfoData(id, gconv.String(tmp["buyer"]), gconv.String(tmp["agency"]), gconv.Strings(tmp["winner"]))
- //更新
- set["buyer_id"] = buyer_id
- set["agency_id"] = agency_id
- set["winner_id"] = winner_ids
- //保存
- tmp["buyer_id"] = buyer_id
- tmp["agency_id"] = agency_id
- tmp["winner_id"] = winner_ids
- if from := gconv.String(tmp["from"]); from == "project" {
- //项目信息补充业态
- //project_id := gconv.String(tmp["project_id"])
- //pro, _ := MgoPro.FindById("projectset_20230904", project_id, map[string]interface{}{"property_form": 1})
- //if len(*pro) > 0 && (*pro)["property_form"] != nil {
- // //更新
- // set["property_form"] = (*pro)["property_form"]
- // //保存
- // tmp["property_form"] = (*pro)["property_form"]
- //}
- //查询情报信息
- ids := gconv.Strings(tmp["info_ids"])
- info := FindInfomationData(ids...) //情报信息查询
- //更新
- set["information_id"] = info.Id
- set["starttime"] = info.Starttime
- set["endtime"] = info.Endtime
- //保存
- tmp["information_id"] = info.Id
- tmp["starttime"] = info.Starttime
- tmp["endtime"] = info.Endtime
- } else {
- if project_bidstatus := gconv.Int(tmp["project_bidstatus"]); project_bidstatus == 4 { //拟建新增物业项目,补充情报信息
- //查询情报信息
- id := gconv.String(tmp["info_id"])
- info := FindInfomationData(id) //情报信息查询
- //更新
- set["information_id"] = info.Id
- set["starttime"] = info.Starttime
- set["endtime"] = info.Endtime
- //保存
- tmp["information_id"] = info.Id
- tmp["starttime"] = info.Starttime
- tmp["endtime"] = info.Endtime
- }
- }
- delete(tmp, "from") //无用字段删除
- delete(tmp, "_id") //无用字段删除
- if !SaveDataToEs(tmp) { //保存、更新es
- fmt.Println("数据保存es失败,数据类型 项目project_id", tmp["project_id"])
- }
- var err error
- err = UpdateOrSaveDataToClickHouse(tmp) //保存、更新clickhouse
- if err != nil {
- fmt.Println("数据迁移失败,数据类型 项目project_id", tmp["project_id"], err)
- }
- //更新
- update = append(update, map[string]interface{}{"$set": set})
- lock.Lock()
- arr = append(arr, update)
- if len(arr) > 100 {
- MgoPro.UpdateBulk("projectset_wy_back", arr...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }(tmp)
- if n%100 == 0 {
- fmt.Println("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- if len(arr) > 0 {
- MgoPro.UpdateBulk("projectset_wy_back", arr...)
- arr = [][]map[string]interface{}{}
- }
- fmt.Println("迁移结束...")
- }
|