package main import ( "go.mongodb.org/mongo-driver/bson" "log" util "utils" "utils/elastic" "utils/mongodb" ) //根据抽取表repeat=1,删除es中重复数据 func biddingDelByExtract(data []byte, mapInfo map[string]interface{}) { defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), // "$gte": mongodb.StringTOBsonId("5eb18d86511b1203376bd742"), // "$lte": mongodb.StringTOBsonId("5ebb43c5f2c1a7850bb1337c"), }, "repeat": 1, } } extractc, _ := extract["collect"].(string) index, _ := bidding["index"].(string) itype, _ := bidding["type"].(string) //extract extractConn := extractMgo.GetMgoConn() defer extractMgo.DestoryMongoConn(extractConn) //查询数据 count, _ := extractConn.DB(extractMgo.DbName).C(extractc).Find(&q).Count() log.Println("查询语句:", q, "删除同步总数:", count, "elastic库:", index) extractquery := extractConn.DB(extractMgo.DbName).C(extractc).Find(q).Select( bson.M{"_id": 1}, ).Sort("_id").Iter() log.Println("开始迭代...") i := 0 var n int var dnum int for tmp := make(map[string]interface{}); extractquery.Next(tmp); i = i + 1 { n++ _id := mongodb.BsonIdToSId(tmp["_id"]) if elastic.DelById(index, itype, _id) { //删除 dnum++ } //if other_index != "" && other_itype != "" { // bidding_other_es.DelById(other_index, other_itype, _id) //} if n%200 == 0 { log.Println("当前:", n) } tmp = make(map[string]interface{}) } log.Println("共删除:", dnum) }