package main import ( "fmt" "github.com/wcc4869/common_utils/log" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" ) // dealBiddingNiJian 更新bidding ,owner 不为空的赋值给buyer //if toptype == "拟建" // if tmp["owner"] != nil // tmp["buyer"] = tmp["owner"] func dealBiddingNiJian() { defer util.Catch() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$lt": 1735228800, }, "toptype": "拟建", } it := sess.DB("qfw").C("bidding").Find(where).Select(nil).Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } if util.ObjToString(tmp["toptype"]) == "拟建" { update := map[string]interface{}{} esUpdate := map[string]interface{}{} biddingID := mongodb.BsonIdToSId(tmp["_id"]) if tmp["owner"] != nil { update["buyer"] = tmp["owner"] esUpdate["buyer"] = tmp["owner"] } if len(update) > 0 { //更新mongo //MgoT.UpdateById("bidding", biddingID, map[string]interface{}{"$set": update}) //更新MongoDB updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": update}, } //2.es 项目 更新字段 //err := Es.UpdateDocument("bidding", biddingID, update) //if err != nil && err.Error() != "Document not updated: noop" { // log.Info("bidding es update err", err, biddingID) //} //// 更新es //updateEsPool <- []map[string]interface{}{ // {"_id": biddingID}, // update, //} } // 更新Es 数据 if len(esUpdate) > 0 { // 更新es updateEsPool <- []map[string]interface{}{ {"_id": biddingID}, esUpdate, } } } } log.Info("Run Over...Count:", log.Int("count", count)) }