浏览代码

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

apple 5 年之前
父节点
当前提交
5580d13151
共有 3 个文件被更改,包括 75 次插入1 次删除
  1. 6 0
      udpcreateindex/src/biddingall.go
  2. 61 0
      udpcreateindex/src/biddingdelete.go
  3. 8 1
      udpcreateindex/src/main.go

+ 6 - 0
udpcreateindex/src/biddingall.go

@@ -290,6 +290,9 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 				if len(multiIndex) == 2 {
 					elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
 				}
+				if other_index != "" && other_itype != "" { //备份库同时生索引
+					bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+				}
 				arrEs = []map[string]interface{}{}
 			}
 			UpdatesLock.Unlock()
@@ -313,6 +316,9 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 		if len(multiIndex) == 2 {
 			elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
 		}
+		if other_index != "" && other_itype != "" { //备份库同时生索引
+			bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+		}
 	}
 	UpdatesLock.Unlock()
 	log.Println(mapInfo, "create bidding index...over", n)

+ 61 - 0
udpcreateindex/src/biddingdelete.go

@@ -0,0 +1,61 @@
+package main
+
+import (
+	"log"
+	qutil "qfw/util"
+
+	elastic "qfw/util/elastic"
+
+	"gopkg.in/mgo.v2/bson"
+)
+
+//删除es中重复数据repeat=1
+func biddingDel(data []byte, mapInfo map[string]interface{}) {
+	defer qutil.Catch()
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": bson.M{
+				"$gt":  qutil.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
+				// "$gte": qutil.StringTOBsonId("5eb18d86511b1203376bd742"),
+				// "$lte": qutil.StringTOBsonId("5ebb43c5f2c1a7850bb1337c"),
+			},
+			"repeat": 1,
+		}
+	}
+	extractc, _ := bidding["extractcollect"].(string)
+	extractdb, _ := bidding["extractdb"].(string)
+	index, _ := bidding["index"].(string)
+	itype, _ := bidding["type"].(string)
+
+	//extract
+	extractsession := extractmgo.GetMgoConn(86400)
+	defer extractmgo.DestoryMongoConn(extractsession)
+	//查询数据
+	count, _ := extractsession.DB(extractdb).C(extractc).Find(&q).Count()
+	log.Println("查询语句:", q, "删除同步总数:", count, "elastic库:", index)
+	extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Select(
+		bson.M{"_id": 1},
+	).Sort("_id").Iter()
+	log.Println("开始迭代...")
+	i := 0
+	var n int
+	var dnum int
+	for tmp := make(map[string]interface{}); extractquery.Next(tmp); i = i + 1 {
+		n++
+		_id := qutil.BsonIdToSId(tmp["_id"])
+		if elastic.DelById(index, itype, _id) { //删除
+			dnum++
+		}
+		if other_index != "" && other_itype != "" {
+			bidding_other_es.DelById(other_index, other_itype, _id)
+		}
+
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	log.Println("共删除:", dnum)
+}

+ 8 - 1
udpcreateindex/src/main.go

@@ -93,7 +93,6 @@ func init() {
 		DbName:      standard["db"].(string),
 	}
 	mgostandard.InitPool()
-	log.Println(standard["addr"].(string))
 	//初始化es
 	//bidding
 	econf := Sysconfig["elastic"].(map[string]interface{})
@@ -255,6 +254,14 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					standardTask("agencyent", mapInfo)
 				}()
+			case "biddingdel": //标准库
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					biddingDel(data, mapInfo)
+				}()
 			default:
 				pool <- true
 				go func() {