package main import ( "log" qutil "qfw/util" elastic "qfw/util/elastic" "gopkg.in/mgo.v2/bson" ) //根据bidding表extracttype=-1,删除es中重复数据 func biddingDelByExtracttype(data []byte, mapInfo map[string]interface{}) { defer qutil.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)), // "$gte": qutil.StringTOBsonId("5eb18d86511b1203376bd742"), // "$lte": qutil.StringTOBsonId("5ebb43c5f2c1a7850bb1337c"), }, "extracttype": -1, } } db, _ := bidding["db"].(string) c, _ := bidding["collect"].(string) index, _ := bidding["index"].(string) itype, _ := bidding["type"].(string) //bidding session := mgo.GetMgoConn() defer mgo.DestoryMongoConn(session) //查询数据 count, _ := session.DB(db).C(c).Find(&q).Count() log.Println("查询语句:", q, "删除同步总数:", count, "elastic库:", index) biddingquery := session.DB(db).C(c).Find(q).Select( bson.M{"_id": 1}, ).Sort("_id").Iter() log.Println("开始迭代...") i := 0 var n int var dnum int for tmp := make(map[string]interface{}); biddingquery.Next(tmp); i = i + 1 { n++ _id := qutil.BsonIdToSId(tmp["_id"]) if elastic.DelById(index, itype, _id) { //删除 dnum++ } if other_index != "" && other_itype != "" { bidding_other_es.DelById(other_index, other_itype, _id) } if n%savesizei == 0 { log.Println("当前:", n) } tmp = make(map[string]interface{}) } log.Println("共删除:", dnum) }