12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- package main
- import (
- "log"
- qutil "qfw/util"
- elastic "qfw/util/elastic"
- "gopkg.in/mgo.v2/bson"
- )
- //根据bidding表extracttype=-1,删除es中重复数据
- func biddingDelByExtracttype(data []byte, mapInfo map[string]interface{}) {
- defer qutil.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
- // "$gte": qutil.StringTOBsonId("5eb18d86511b1203376bd742"),
- // "$lte": qutil.StringTOBsonId("5ebb43c5f2c1a7850bb1337c"),
- },
- "extracttype": -1,
- }
- }
- db, _ := bidding["db"].(string)
- c, _ := bidding["collect"].(string)
- index, _ := bidding["index"].(string)
- itype, _ := bidding["type"].(string)
- //bidding
- session := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(session)
- //查询数据
- count, _ := session.DB(db).C(c).Find(&q).Count()
- log.Println("查询语句:", q, "删除同步总数:", count, "elastic库:", index)
- biddingquery := session.DB(db).C(c).Find(q).Select(
- bson.M{"_id": 1},
- ).Sort("_id").Iter()
- log.Println("开始迭代...")
- i := 0
- var n int
- var dnum int
- for tmp := make(map[string]interface{}); biddingquery.Next(tmp); i = i + 1 {
- n++
- _id := qutil.BsonIdToSId(tmp["_id"])
- if elastic.DelById(index, itype, _id) { //删除
- dnum++
- }
- if other_index != "" && other_itype != "" {
- bidding_other_es.DelById(other_index, other_itype, _id)
- }
- if n%savesizei == 0 {
- log.Println("当前:", n)
- }
- tmp = make(map[string]interface{})
- }
- log.Println("共删除:", dnum)
- }
|