biddingdeletebyextracttype.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package main
  2. import (
  3. "log"
  4. qutil "qfw/util"
  5. elastic "qfw/util/elastic"
  6. "gopkg.in/mgo.v2/bson"
  7. )
  8. //根据bidding表extracttype=-1,删除es中重复数据
  9. func biddingDelByExtracttype(data []byte, mapInfo map[string]interface{}) {
  10. defer qutil.Catch()
  11. q, _ := mapInfo["query"].(map[string]interface{})
  12. if q == nil {
  13. q = map[string]interface{}{
  14. "_id": bson.M{
  15. "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
  16. "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
  17. // "$gte": qutil.StringTOBsonId("5eb18d86511b1203376bd742"),
  18. // "$lte": qutil.StringTOBsonId("5ebb43c5f2c1a7850bb1337c"),
  19. },
  20. "extracttype": -1,
  21. }
  22. }
  23. db, _ := bidding["db"].(string)
  24. c, _ := bidding["collect"].(string)
  25. index, _ := bidding["index"].(string)
  26. itype, _ := bidding["type"].(string)
  27. //bidding
  28. session := mgo.GetMgoConn()
  29. defer mgo.DestoryMongoConn(session)
  30. //查询数据
  31. count, _ := session.DB(db).C(c).Find(&q).Count()
  32. log.Println("查询语句:", q, "删除同步总数:", count, "elastic库:", index)
  33. biddingquery := session.DB(db).C(c).Find(q).Select(
  34. bson.M{"_id": 1},
  35. ).Sort("_id").Iter()
  36. log.Println("开始迭代...")
  37. i := 0
  38. var n int
  39. var dnum int
  40. for tmp := make(map[string]interface{}); biddingquery.Next(tmp); i = i + 1 {
  41. n++
  42. _id := qutil.BsonIdToSId(tmp["_id"])
  43. if elastic.DelById(index, itype, _id) { //删除
  44. dnum++
  45. }
  46. if other_index != "" && other_itype != "" {
  47. bidding_other_es.DelById(other_index, other_itype, _id)
  48. }
  49. if n%savesizei == 0 {
  50. log.Println("当前:", n)
  51. }
  52. tmp = make(map[string]interface{})
  53. }
  54. log.Println("共删除:", dnum)
  55. }