tmp.go 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/wcc4869/common_utils/log"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. )
  8. // dealBiddingEs 根据临时表,更新es href 字段
  9. func dealBiddingEsHref() {
  10. defer util.Catch()
  11. sess := MgoB.GetMgoConn()
  12. defer MgoB.DestoryMongoConn(sess)
  13. it := sess.DB("qfw").C("bidding_es_20241129").Find(nil).Select(nil).Iter()
  14. fmt.Println("taskRun 开始")
  15. count := 0
  16. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  17. if count%100 == 0 {
  18. log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
  19. }
  20. if util.IntAll(tmp["extracttype"]) == -1 {
  21. continue
  22. }
  23. esUpdate := map[string]interface{}{}
  24. biddingID := mongodb.BsonIdToSId(tmp["_id"])
  25. esUpdate["href"] = tmp["href"]
  26. // 更新Es 数据
  27. if len(esUpdate) > 0 {
  28. // 更新es
  29. updateEsPool <- []map[string]interface{}{
  30. {"_id": biddingID},
  31. esUpdate,
  32. }
  33. }
  34. }
  35. log.Info("Run Over...Count:", log.Int("count", count))
  36. }