package main import ( "fmt" "github.com/wcc4869/common_utils/log" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" ) // dealBiddingEs 根据临时表,更新es href 字段 func dealBiddingEsHref() { defer util.Catch() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) it := sess.DB("qfw").C("bidding_es_20241129").Find(nil).Select(nil).Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%100 == 0 { log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"])) } if util.IntAll(tmp["extracttype"]) == -1 { continue } esUpdate := map[string]interface{}{} biddingID := mongodb.BsonIdToSId(tmp["_id"]) esUpdate["href"] = tmp["href"] // 更新Es 数据 if len(esUpdate) > 0 { // 更新es updateEsPool <- []map[string]interface{}{ {"_id": biddingID}, esUpdate, } } } log.Info("Run Over...Count:", log.Int("count", count)) }