123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452 |
- package main
- import (
- "fmt"
- "github.com/wcc4869/common_utils/log"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "strings"
- )
- // updateBiddingBasicClassTest 测试环境更新 basicClass
- func updateBiddingBasicClassTest() {
- defer util.Catch()
- sess := MgoT.GetMgoConn()
- defer MgoT.DestoryMongoConn(sess)
- it := sess.DB("qfw_data").C("bidding").Find(nil).Select(nil).Sort("-_id").Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%5000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- update := make(map[string]interface{})
- if _, ok := tmp["basicClass"]; ok && util.ObjToString(tmp["basicClass"]) != "" {
- update["basicClass"] = tmp["basicClass"]
- }
- // 更新Es 数据
- if len(update) > 0 {
- ////更新MongoDB
- //updatePool <- []map[string]interface{}{
- // {"_id": tmp["_id"]},
- // {"$set": update},
- //}
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": biddingID},
- update,
- }
- }
- }
- }
- // updateBiddingBasicClass 更新bidding basicClass 存量数据
- func updateBiddingBasicClass() {
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$lte": 1751273717,
- },
- }
- it := sess.DB("qfw").C("bidding").Find(where).Select(nil).Sort("-_id").Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%5000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- update := make(map[string]interface{})
- if _, ok := tmp["basicClass"]; ok && util.ObjToString(tmp["basicClass"]) != "" {
- update["basicClass"] = tmp["basicClass"]
- }
- // 更新Es 数据
- if len(update) > 0 {
- ////更新MongoDB
- //updatePool <- []map[string]interface{}{
- // {"_id": tmp["_id"]},
- // {"$set": update},
- //}
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": biddingID},
- update,
- }
- }
- }
- }
- // updateBiddingType22 根据爬虫代码,更新标讯分类
- func updateBiddingTypeBySpidecode() {
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": 1735660800,
- //"$lte": 1750908768,
- "$lte": 1751438445,
- },
- }
- it := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- spidecode := util.ObjToString(tmp["spidercode"])
- site := util.ObjToString(tmp["site"])
- if spidecode == "tj_tjstzxmzxspbsdt_sybj" && site == "天津市投资项目在线审批办事大厅" {
- update := make(map[string]interface{})
- update["toptype"] = "拟建"
- update["subtype"] = "拟建"
- update["infoformat"] = 1
- // 更新Es 数据
- if len(update) > 0 {
- ////更新MongoDB
- updatePool <- []map[string]interface{}{
- {"_id": tmp["_id"]},
- {"$set": update},
- }
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": biddingID},
- update,
- }
- }
- }
- }
- }
- func updateBiddingisValidFile() {
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": 1750755600,
- "$lte": 1750822200,
- },
- }
- it := sess.DB("qfw").C("bidding").Find(where).Select(nil).Sort("-_id").Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- update := make(map[string]interface{})
- if _, ok := tmp["isValidFile"]; ok {
- update["isValidFile"] = tmp["isValidFile"]
- }
- // 更新Es 数据
- if len(update) > 0 {
- ////更新MongoDB
- //updatePool <- []map[string]interface{}{
- // {"_id": tmp["_id"]},
- // {"$set": update},
- //}
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": biddingID},
- update,
- }
- }
- }
- }
- // updateBiddingType 更新bidding 分类类型
- func updateBiddingType() {
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": mongodb.StringTOBsonId("5a862f0640d2d9bbe88e3cec"),
- //"$lte": mongodb.StringTOBsonId("68552e15c936757aa1774910"),
- "$lte": mongodb.StringTOBsonId("68551ebfc936757aa176c9ae"),
- },
- }
- it := sess.DB("qfw").C("bidding").Find(where).Select(nil).Sort("-_id").Iter()
- count := 0
- count2 := 0
- ruleStr := `(开标记录表|开标一览表),(开标参与人|开标地点|开标时间|开标记录|开标一览表)3^(公开招标公告|中标人信息|供应商资格要求|投标人资格要求|招标条件|潜在投标人)`
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- title := util.ObjToString(tmp["title"])
- detail := util.ObjToString(tmp["detail"])
- update := make(map[string]interface{})
- //匹配标题
- if strings.Contains(title, "开标记录") {
- update["toptype"] = "结果"
- update["subtype"] = "开标记录"
- } else {
- //匹配内容
- result := MatchAllRules(detail, ruleStr)
- if result.Matched {
- update["toptype"] = "结果"
- update["subtype"] = "开标记录"
- }
- }
- // 更新Es 数据
- if len(update) > 0 {
- count2++
- if count2%1000 == 0 {
- log.Info("updateBiddingType", zap.Int("count2", count2), zap.Any("id", biddingID))
- }
- //更新MongoDB
- updatePool <- []map[string]interface{}{
- {"_id": tmp["_id"]},
- {"$set": update},
- }
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": biddingID},
- update,
- }
- }
- }
- }
- // dealBiddingNiJian 更新bidding ,owner 不为空的赋值给buyer
- //if toptype == "拟建"
- // if tmp["owner"] != nil
- // tmp["buyer"] = tmp["owner"]
- func dealBiddingNiJian() {
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$lt": 1735228800,
- },
- "toptype": "拟建",
- }
- it := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter()
- fmt.Println("taskRun 开始")
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- if util.ObjToString(tmp["toptype"]) == "拟建" {
- update := map[string]interface{}{}
- esUpdate := map[string]interface{}{}
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- if tmp["owner"] != nil {
- update["buyer"] = tmp["owner"]
- esUpdate["buyer"] = tmp["owner"]
- }
- if len(update) > 0 {
- //更新mongo
- //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
- //更新MongoDB
- updatePool <- []map[string]interface{}{
- {"_id": tmp["_id"]},
- {"$set": update},
- }
- //2.es 项目 更新字段
- //err := Es.UpdateDocument("bidding", biddingID, update)
- //if err != nil && err.Error() != "Document not updated: noop" {
- // log.Info("bidding es update err", err, biddingID)
- //}
- //// 更新es
- //updateEsPool <- []map[string]interface{}{
- // {"_id": biddingID},
- // update,
- //}
- }
- // 更新Es 数据
- if len(esUpdate) > 0 {
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": biddingID},
- esUpdate,
- }
- }
- }
- }
- log.Info("Run Over...Count:", log.Int("count", count))
- }
- // updateBiddingBidamount 处理bidding 中标金额,预算数据
- func updateBiddingBidamount() {
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- it := sess.DB("qfw").C("zktest_repair_0107").Find(nil).Select(nil).Iter()
- fmt.Println("taskRun 开始")
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- update := make(map[string]interface{})
- if bidamount, ok := tmp["bidamount"]; ok {
- update["bidamount"] = bidamount
- } else {
- update["bidamount"] = 0.0
- }
- if _, ok := tmp["budget"]; ok {
- update["budget"] = tmp["budget"]
- } else {
- update["budget"] = 0.0
- }
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- if len(update) > 0 {
- //project := getProject(biddingID)
- //if len(project) > 0 {
- // //projectID := mongodb.BsonIdToSId(project["_id"])
- // //err := Es.UpdateDocument("projectset", projectID, update)
- // //err = EsNew.UpdateDocument("projectset", projectID, update)
- // //if err != nil && err.Error() != "Document not updated: noop" {
- // // log.Info("bidding es update err", err, biddingID)
- // //}
- //}
- // 更新bidding es
- err := Es.UpdateDocument("bidding", biddingID, update)
- err = EsNew.UpdateDocument("bidding", biddingID, update)
- if err != nil && err.Error() != "Document not updated: noop" {
- log.Info("bidding es update err", err, biddingID)
- }
- }
- }
- log.Info("数据处理完毕", log.Int("total", count))
- }
- // updateBiddingToptype 更新招标分类结果
- func updateBiddingToptype() {
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- it := sess.DB("qfw").C("wcc_bidding_kaibiao").Find(nil).Select(nil).Sort("-_id").Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- update := make(map[string]interface{})
- subtype := util.ObjToString(tmp["subtype"])
- if subtype == "开标记录" {
- continue
- }
- update["toptype"] = tmp["toptype"]
- update["subtype"] = tmp["subtype"]
- // 更新Es 数据
- if len(update) > 0 {
- //更新MongoDB
- updatePool <- []map[string]interface{}{
- {"_id": tmp["_id"]},
- {"$set": update},
- }
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": biddingID},
- update,
- }
- }
- }
- }
- // updateBiddingBuyer g 更新buyer
- func updateBiddingBuyer() {
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- it := sess.DB("qfw").C("wcc_buyer_test").Find(nil).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%100 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- biddingID := util.ObjToString(tmp["id"])
- update := make(map[string]interface{})
- update["buyer"] = "海南警察学院(筹)"
- // 更新Es 数据
- if len(update) > 0 {
- //更新MongoDB
- MgoB.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
- //2.es 项目 更新字段
- err := Es.UpdateDocument("bidding", biddingID, update)
- err = EsNew.UpdateDocument("bidding", biddingID, update)
- err = EsNew.UpdateDocument("bidding_temp", biddingID, update)
- if err != nil && err.Error() != "Document not updated: noop" {
- log.Info("bidding es update err", err, biddingID)
- }
- }
- }
- }
|