bidding_del.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package main
  2. import (
  3. "esindex/config"
  4. "go.mongodb.org/mongo-driver/bson"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. )
  10. //根据bidding表extracttype=-1,删除es中重复数据
  11. func biddingDelByExtracttype(data []byte, mapInfo map[string]interface{}) {
  12. defer util.Catch()
  13. q, _ := mapInfo["query"].(map[string]interface{})
  14. if q == nil {
  15. q = map[string]interface{}{
  16. "_id": bson.M{
  17. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  18. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  19. },
  20. "extracttype": -1,
  21. }
  22. }
  23. //bidding
  24. conn := MgoB.GetMgoConn()
  25. defer MgoB.DestoryMongoConn(conn)
  26. //查询数据
  27. biddingquery := conn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(
  28. bson.M{"_id": 1},
  29. ).Sort("_id").Iter()
  30. log.Info("开始迭代...")
  31. i := 0
  32. var n int
  33. var dnum int
  34. for tmp := make(map[string]interface{}); biddingquery.Next(tmp); i = i + 1 {
  35. n++
  36. _id := mongodb.BsonIdToSId(tmp["_id"])
  37. if Es.DelById(config.Conf.DB.Es.IndexB, _id) { //删除
  38. dnum++
  39. }
  40. if n%200 == 0 {
  41. log.Info("current", zap.Int("当前:", n))
  42. }
  43. tmp = make(map[string]interface{})
  44. }
  45. log.Info("over", zap.Int("共删除:", dnum))
  46. }