123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- package main
- import (
- "fmt"
- "github.com/wcc4869/common_utils/log"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- )
- // dealBiddingNiJian 更新bidding ,owner 不为空的赋值给buyer
- //if toptype == "拟建"
- // if tmp["owner"] != nil
- // tmp["buyer"] = tmp["owner"]
- func dealBiddingNiJian() {
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$lt": 1735228800,
- },
- "toptype": "拟建",
- }
- it := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter()
- fmt.Println("taskRun 开始")
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- if util.ObjToString(tmp["toptype"]) == "拟建" {
- update := map[string]interface{}{}
- esUpdate := map[string]interface{}{}
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- if tmp["owner"] != nil {
- update["buyer"] = tmp["owner"]
- esUpdate["buyer"] = tmp["owner"]
- }
- if len(update) > 0 {
- //更新mongo
- //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update})
- //更新MongoDB
- updatePool <- []map[string]interface{}{
- {"_id": tmp["_id"]},
- {"$set": update},
- }
- //2.es 项目 更新字段
- //err := Es.UpdateDocument("bidding", biddingID, update)
- //if err != nil && err.Error() != "Document not updated: noop" {
- // log.Info("bidding es update err", err, biddingID)
- //}
- //// 更新es
- //updateEsPool <- []map[string]interface{}{
- // {"_id": biddingID},
- // update,
- //}
- }
- // 更新Es 数据
- if len(esUpdate) > 0 {
- // 更新es
- updateEsPool <- []map[string]interface{}{
- {"_id": biddingID},
- esUpdate,
- }
- }
- }
- }
- log.Info("Run Over...Count:", log.Int("count", count))
- }
- // updateBiddingBidamount 处理bidding 中标金额,预算数据
- func updateBiddingBidamount() {
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- it := sess.DB("qfw").C("zktest_repair_0107").Find(nil).Select(nil).Iter()
- fmt.Println("taskRun 开始")
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
- }
- update := make(map[string]interface{})
- if bidamount, ok := tmp["bidamount"]; ok {
- update["bidamount"] = bidamount
- } else {
- update["bidamount"] = 0.0
- }
- if _, ok := tmp["budget"]; ok {
- update["budget"] = tmp["budget"]
- } else {
- update["budget"] = 0.0
- }
- biddingID := mongodb.BsonIdToSId(tmp["_id"])
- if len(update) > 0 {
- //project := getProject(biddingID)
- //if len(project) > 0 {
- // //projectID := mongodb.BsonIdToSId(project["_id"])
- // //err := Es.UpdateDocument("projectset", projectID, update)
- // //err = EsNew.UpdateDocument("projectset", projectID, update)
- // //if err != nil && err.Error() != "Document not updated: noop" {
- // // log.Info("bidding es update err", err, biddingID)
- // //}
- //}
- // 更新bidding es
- err := Es.UpdateDocument("bidding", biddingID, update)
- err = EsNew.UpdateDocument("bidding", biddingID, update)
- if err != nil && err.Error() != "Document not updated: noop" {
- log.Info("bidding es update err", err, biddingID)
- }
- }
- }
- log.Info("数据处理完毕", log.Int("total", count))
- }
|