duxin 1 سال پیش
والد
کامیت
9c1d4afae0
1فایلهای تغییر یافته به همراه144 افزوده شده و 120 حذف شده
  1. 144 120
      common/src/qfw/util/dataexport/dataexport.go

+ 144 - 120
common/src/qfw/util/dataexport/dataexport.go

@@ -628,25 +628,30 @@ func GetDataExportSelectReallyCountFromEs(ids []string) int64 {
 	wait := &sync.WaitGroup{}
 	var total int64
 	var lock sync.Mutex
-	idArr := SplitArray(ids, 200)
-	for i := 0; i < len(idArr); i++ {
-		pool <- true
-		wait.Add(1)
-		go func(arr []string) {
-			defer func() {
-				wait.Done()
-				<-pool
-			}()
-			log.Println("GetDataExportSelectReallyCountFromEs===", arr[0])
-			query := fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"id":["%s"]}}]}}}`, strings.Join(arr, "\",\""))
-			tCount := elastic.Count(INDEX, TYPE, query)
-			if tCount > 0 {
-				lock.Lock()
-				total += tCount
-				lock.Unlock()
-			}
-			return
-		}(idArr[i])
+	var idArr []string
+	for i, id := range ids {
+		idArr = append(idArr, id)
+		if len(idArr) == 200 || i+1 == len(ids) {
+			pool <- true
+			wait.Add(1)
+			go func(arr []string) {
+				defer func() {
+					wait.Done()
+					<-pool
+				}()
+				log.Println("GetDataExportSelectReallyCountFromEs===", arr[0])
+				query := fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"id":["%s"]}}]}}}`, strings.Join(arr, "\",\""))
+				tCount := elastic.Count(INDEX, TYPE, query)
+				if tCount > 0 {
+					lock.Lock()
+					total += tCount
+					lock.Unlock()
+				}
+				return
+			}(idArr)
+			idArr = []string{}
+		}
+
 	}
 	wait.Wait()
 	log.Printf("GetDataExportSelectReallyCount 选择数据共%d条记录,实际查询%d条\n", len(ids), total)
@@ -667,45 +672,50 @@ func GetDataExportSelectReallyCountFromMongo(bid mg.MongodbSim, biddingName stri
 	)
 	pool := make(chan bool, 10)
 	wait := &sync.WaitGroup{}
-	idArr := SplitArray(ids, 200)
-	for i := 0; i < len(idArr); i++ {
-		pool <- true
-		wait.Add(1)
-		go func(arr []string) {
-			defer func() {
-				wait.Done()
-				<-pool
-			}()
-			log.Println("GetDataExportSelectReallyCountFromMongo===", arr[0])
-			lenNum := int64(len(arr))
-			var (
-				queryIds   []interface{}
-				num1, num2 int64
-				err        error
-			)
-			for _, idStr := range arr {
-				queryIds = append(queryIds, mg.StringTOBsonId(idStr))
-			}
-			num1, err = sess.DB(biddingName).C("bidding").Find(map[string]interface{}{"_id": map[string]interface{}{
-				"$in": queryIds,
-			}}).Count()
-			if err == nil {
-				if num1 == lenNum {
-					lock.Lock()
-					count += num1
-					lock.Unlock()
-					return
+	var idArr []string
+	for i, id := range ids {
+		idArr = append(idArr, id)
+		if len(idArr) == 200 || i+1 == len(ids) {
+			pool <- true
+			wait.Add(1)
+			go func(arr []string) {
+				defer func() {
+					wait.Done()
+					<-pool
+				}()
+				log.Println("GetDataExportSelectReallyCountFromMongo===", arr[0])
+				lenNum := int64(len(arr))
+				var (
+					queryIds   []interface{}
+					num1, num2 int64
+					err        error
+				)
+				for _, idStr := range arr {
+					queryIds = append(queryIds, mg.StringTOBsonId(idStr))
 				}
-				num2, err = sess.DB(biddingName).C("bidding_back").Find(map[string]interface{}{"_id": map[string]interface{}{
+				num1, err = sess.DB(biddingName).C("bidding").Find(map[string]interface{}{"_id": map[string]interface{}{
 					"$in": queryIds,
 				}}).Count()
 				if err == nil {
-					lock.Lock()
-					count += qutil.If(num2+num1 >= lenNum, lenNum, num2+num1).(int64)
-					lock.Unlock()
+					if num1 == lenNum {
+						lock.Lock()
+						count += num1
+						lock.Unlock()
+						return
+					}
+					num2, err = sess.DB(biddingName).C("bidding_back").Find(map[string]interface{}{"_id": map[string]interface{}{
+						"$in": queryIds,
+					}}).Count()
+					if err == nil {
+						lock.Lock()
+						count += qutil.If(num2+num1 >= lenNum, lenNum, num2+num1).(int64)
+						lock.Unlock()
+					}
 				}
-			}
-		}(idArr[i])
+			}(idArr)
+			idArr = []string{}
+		}
+
 	}
 	wait.Wait()
 	return qutil.If(count > 0, count, -2).(int64)
@@ -723,33 +733,37 @@ func GetDataExportSelectResultFromEs(bidding mg.MongodbSim, biddingName string,
 	wait := &sync.WaitGroup{}
 	var lock sync.Mutex
 	returnLsit := make([]map[string]interface{}, 0, len(scd.SelectIds))
-	idArr := SplitArray(scd.SelectIds, 200)
-	for i := 0; i < len(idArr); i++ {
-		pool <- true
-		wait.Add(1)
-		go func(arr []string) error {
-			defer func() {
-				wait.Done()
-				<-pool
-			}()
-			log.Println(scd.Id, "GetDataExportSelectResultFromEs===", arr[0])
-			query := fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"id":["%s"]}}]}},"_source": [%s],"size":%d}`, strings.Join(arr, "\",\""), bidField, len(arr))
-			log.Println("GetDataExportSelectResultFromEs 数据流量包 es count 信息查询:", query)
-			data := *elastic.Get(INDEX, TYPE, query)
-			if data != nil && len(data) > 0 {
-				for _, bv := range data {
-					bv["_id"] = mg.BsonIdToSId(bv["_id"])
-					detail, _ := bv["detail"].(string)
-					if detail != "" {
-						bv["detail"] = contentfilterReg.ReplaceAllString(detail, "")
+	var idArr []string
+	for i, id := range scd.SelectIds {
+		idArr = append(idArr, id)
+		if len(idArr) == 200 || i+1 == len(scd.SelectIds) {
+			pool <- true
+			wait.Add(1)
+			go func(arr []string) error {
+				defer func() {
+					wait.Done()
+					<-pool
+				}()
+				log.Println(scd.Id, "GetDataExportSelectResultFromEs===", arr[0])
+				query := fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"id":["%s"]}}]}},"_source": [%s],"size":%d}`, strings.Join(arr, "\",\""), bidField, len(arr))
+				log.Println("GetDataExportSelectResultFromEs 数据流量包 es count 信息查询:", query)
+				data := *elastic.Get(INDEX, TYPE, query)
+				if data != nil && len(data) > 0 {
+					for _, bv := range data {
+						bv["_id"] = mg.BsonIdToSId(bv["_id"])
+						detail, _ := bv["detail"].(string)
+						if detail != "" {
+							bv["detail"] = contentfilterReg.ReplaceAllString(detail, "")
+						}
+						lock.Lock()
+						returnLsit = append(returnLsit, bv)
+						lock.Unlock()
 					}
-					lock.Lock()
-					returnLsit = append(returnLsit, bv)
-					lock.Unlock()
 				}
-			}
-			return nil
-		}(idArr[i])
+				return nil
+			}(idArr)
+			idArr = []string{}
+		}
 	}
 	wait.Wait()
 	if len(returnLsit) == checkCount || checkCount == -1 {
@@ -777,50 +791,60 @@ func GetDataExportSelectResultFromMongoDb(bidding mg.MongodbSim, biddingName str
 	wait := &sync.WaitGroup{}
 	var lock sync.Mutex
 	returnLsit := make([]map[string]interface{}, 0, len(scd.SelectIds))
-	idArr := SplitArray(scd.SelectIds, 200)
-	for i := 0; i < len(idArr); i++ {
-		pool <- true
-		wait.Add(1)
-		go func(arr []string) error {
-			defer func() {
-				wait.Done()
-				<-pool
-			}()
-			log.Println(scd.Id, "GetDataExportSelectResultFromMongoDb===", arr[0])
-			var queryIds []interface{}
-			for _, idStr := range arr {
-				queryIds = append(queryIds, mg.StringTOBsonId(idStr))
-			}
-			iter := sess.DB(biddingName).C("bidding").Select(selectMap).Find(map[string]interface{}{"_id": map[string]interface{}{
-				"$in": queryIds,
-			}}).Iter()
-			for m := make(map[string]interface{}); iter.Next(&m); {
-				m["_id"] = mg.BsonIdToSId(m["_id"])
-				detail, _ := m["detail"].(string)
-				if detail != "" {
-					m["detail"] = contentfilterReg.ReplaceAllString(detail, "")
+	var idArr []string
+	for i, id := range scd.SelectIds {
+		idArr = append(idArr, id)
+		if len(idArr) == 200 || i+1 == len(scd.SelectIds) {
+			pool <- true
+			wait.Add(1)
+			go func(arr []string) error {
+				defer func() {
+					wait.Done()
+					<-pool
+				}()
+				log.Println(scd.Id, "GetDataExportSelectResultFromMongoDb===", arr[0])
+				var (
+					queryIds []interface{}
+					count    int
+				)
+				for _, idStr := range arr {
+					queryIds = append(queryIds, mg.StringTOBsonId(idStr))
 				}
-				lock.Lock()
-				returnLsit = append(returnLsit, m)
-				lock.Unlock()
-				m = make(map[string]interface{})
-			}
-			iter_back := sess.DB(biddingName).C("bidding_back").Select(selectMap).Find(map[string]interface{}{"_id": map[string]interface{}{
-				"$in": queryIds,
-			}}).Iter()
-			for m := make(map[string]interface{}); iter_back.Next(&m); {
-				m["_id"] = mg.BsonIdToSId(m["_id"])
-				detail, _ := m["detail"].(string)
-				if detail != "" {
-					m["detail"] = contentfilterReg.ReplaceAllString(detail, "")
+				iter := sess.DB(biddingName).C("bidding").Select(selectMap).Find(map[string]interface{}{"_id": map[string]interface{}{
+					"$in": queryIds,
+				}}).Iter()
+				for m := make(map[string]interface{}); iter.Next(&m); {
+					m["_id"] = mg.BsonIdToSId(m["_id"])
+					detail, _ := m["detail"].(string)
+					if detail != "" {
+						m["detail"] = contentfilterReg.ReplaceAllString(detail, "")
+					}
+					count++
+					lock.Lock()
+					returnLsit = append(returnLsit, m)
+					lock.Unlock()
+					m = make(map[string]interface{})
 				}
-				lock.Lock()
-				returnLsit = append(returnLsit, m)
-				lock.Unlock()
-				m = make(map[string]interface{})
-			}
-			return nil
-		}(idArr[i])
+				if count != len(arr) {
+					iter_back := sess.DB(biddingName).C("bidding_back").Select(selectMap).Find(map[string]interface{}{"_id": map[string]interface{}{
+						"$in": queryIds,
+					}}).Iter()
+					for m := make(map[string]interface{}); iter_back.Next(&m); {
+						m["_id"] = mg.BsonIdToSId(m["_id"])
+						detail, _ := m["detail"].(string)
+						if detail != "" {
+							m["detail"] = contentfilterReg.ReplaceAllString(detail, "")
+						}
+						lock.Lock()
+						returnLsit = append(returnLsit, m)
+						lock.Unlock()
+						m = make(map[string]interface{})
+					}
+				}
+				return nil
+			}(idArr)
+			idArr = []string{}
+		}
 	}
 	wait.Wait()
 	if len(returnLsit) == checkCount || checkCount == -1 {