maxiaoshan 5 年之前
父节点
当前提交
a6abfe1ece

+ 2 - 2
udpcreateindex/src/biddingdelete.go → udpcreateindex/src/biddingdeletebyextract.go

@@ -9,8 +9,8 @@ import (
 	"gopkg.in/mgo.v2/bson"
 )
 
-//删除es中重复数据repeat=1
-func biddingDel(data []byte, mapInfo map[string]interface{}) {
+//根据抽取表repeat=1,删除es中重复数据
+func biddingDelByExtract(data []byte, mapInfo map[string]interface{}) {
 	defer qutil.Catch()
 	q, _ := mapInfo["query"].(map[string]interface{})
 	if q == nil {

+ 61 - 0
udpcreateindex/src/biddingdeletebyextracttype.go

@@ -0,0 +1,61 @@
+package main
+
+import (
+	"log"
+	qutil "qfw/util"
+
+	elastic "qfw/util/elastic"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+//根据bidding表extracttype=-1,删除es中重复数据
+func biddingDelByExtracttype(data []byte, mapInfo map[string]interface{}) {
+	defer qutil.Catch()
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": bson.M{
+				"$gt":  qutil.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
+				// "$gte": qutil.StringTOBsonId("5eb18d86511b1203376bd742"),
+				// "$lte": qutil.StringTOBsonId("5ebb43c5f2c1a7850bb1337c"),
+			},
+			"extracttype": -1,
+		}
+	}
+	db, _ := bidding["db"].(string)
+	c, _ := bidding["collect"].(string)
+	index, _ := bidding["index"].(string)
+	itype, _ := bidding["type"].(string)
+
+	//bidding
+	session := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(session)
+	//查询数据
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	log.Println("查询语句:", q, "删除同步总数:", count, "elastic库:", index)
+	biddingquery := session.DB(db).C(c).Find(q).Select(
+		bson.M{"_id": 1},
+	).Sort("_id").Iter()
+	log.Println("开始迭代...")
+	i := 0
+	var n int
+	var dnum int
+	for tmp := make(map[string]interface{}); biddingquery.Next(tmp); i = i + 1 {
+		n++
+		_id := qutil.BsonIdToSId(tmp["_id"])
+		if elastic.DelById(index, itype, _id) { //删除
+			dnum++
+		}
+		if other_index != "" && other_itype != "" {
+			bidding_other_es.DelById(other_index, other_itype, _id)
+		}
+
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	log.Println("共删除:", dnum)
+}

+ 10 - 2
udpcreateindex/src/main.go

@@ -254,13 +254,21 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					standardTask("agencyent", mapInfo)
 				}()
-			case "biddingdel": //标准库
+			case "biddingdelbyextract": //根据repeat删除es
 				pool <- true
 				go func() {
 					defer func() {
 						<-pool
 					}()
-					biddingDel(data, mapInfo)
+					biddingDelByExtract(data, mapInfo)
+				}()
+			case "biddingdelbyextracttype": //根据extracttype删除es
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					biddingDelByExtracttype(data, mapInfo)
 				}()
 			default:
 				pool <- true