Эх сурвалжийг харах

fix:数据导出取消并发获取数据

duxin 1 жил өмнө
parent
commit
729b85e4a1

+ 57 - 30
common/src/qfw/util/dataexport/dataexport.go

@@ -624,23 +624,36 @@ var contentfilterReg = regexp.MustCompile("<[^>]+>")
 
 // GetDataExportSelectReallyCountFromEs 从elasticsearch查询实际可调导出数量
 func GetDataExportSelectReallyCountFromEs(ids []string) int64 {
-
+	pool := make(chan bool, 10)
+	wait := &sync.WaitGroup{}
 	var total int64
+	var lock sync.Mutex
 	var idArr []string
 	for i, id := range ids {
 		idArr = append(idArr, id)
 		if len(idArr) == 200 || i+1 == len(ids) {
-			log.Println("GetDataExportSelectReallyCountFromEs===", idArr[0])
-			query := fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"id":["%s"]}}]}}}`, strings.Join(idArr, "\",\""))
-			tCount := elastic.Count(INDEX, TYPE, query)
-			if tCount > 0 {
-				total += tCount
-			}
-
+			pool <- true
+			wait.Add(1)
+			go func(arr []string) {
+				defer func() {
+					wait.Done()
+					<-pool
+				}()
+				log.Println("GetDataExportSelectReallyCountFromEs===", arr[0])
+				query := fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"id":["%s"]}}]}}}`, strings.Join(arr, "\",\""))
+				tCount := elastic.Count(INDEX, TYPE, query)
+				if tCount > 0 {
+					lock.Lock()
+					total += tCount
+					lock.Unlock()
+				}
+				return
+			}(idArr)
 			idArr = []string{}
 		}
 
 	}
+	wait.Wait()
 	log.Printf("GetDataExportSelectReallyCount 选择数据共%d条记录,实际查询%d条\n", len(ids), total)
 	return total
 }
@@ -655,42 +668,56 @@ func GetDataExportSelectReallyCountFromMongo(bid mg.MongodbSim, biddingName stri
 
 	var (
 		count int64
+		lock  sync.Mutex
 	)
+	pool := make(chan bool, 10)
+	wait := &sync.WaitGroup{}
 	var idArr []string
 	for i, id := range ids {
 		idArr = append(idArr, id)
 		if len(idArr) == 200 || i+1 == len(ids) {
-
-			log.Println("GetDataExportSelectReallyCountFromMongo===", idArr[0])
-			lenNum := int64(len(idArr))
-			var (
-				queryIds   []interface{}
-				num1, num2 int64
-				err        error
-			)
-			for _, idStr := range idArr {
-				queryIds = append(queryIds, mg.StringTOBsonId(idStr))
-			}
-			num1, err = sess.DB(biddingName).C("bidding").Find(map[string]interface{}{"_id": map[string]interface{}{
-				"$in": queryIds,
-			}}).Count()
-			if err == nil {
-				if num1 == lenNum {
-					count += num1
-					idArr = []string{}
-					continue
+			pool <- true
+			wait.Add(1)
+			go func(arr []string) {
+				defer func() {
+					wait.Done()
+					<-pool
+				}()
+				log.Println("GetDataExportSelectReallyCountFromMongo===", arr[0])
+				lenNum := int64(len(arr))
+				var (
+					queryIds   []interface{}
+					num1, num2 int64
+					err        error
+				)
+				for _, idStr := range arr {
+					queryIds = append(queryIds, mg.StringTOBsonId(idStr))
 				}
-				num2, err = sess.DB(biddingName).C("bidding_back").Find(map[string]interface{}{"_id": map[string]interface{}{
+				num1, err = sess.DB(biddingName).C("bidding").Find(map[string]interface{}{"_id": map[string]interface{}{
 					"$in": queryIds,
 				}}).Count()
 				if err == nil {
-					count += qutil.If(num2+num1 >= lenNum, lenNum, num2+num1).(int64)
+					if num1 == lenNum {
+						lock.Lock()
+						count += num1
+						lock.Unlock()
+						return
+					}
+					num2, err = sess.DB(biddingName).C("bidding_back").Find(map[string]interface{}{"_id": map[string]interface{}{
+						"$in": queryIds,
+					}}).Count()
+					if err == nil {
+						lock.Lock()
+						count += qutil.If(num2+num1 >= lenNum, lenNum, num2+num1).(int64)
+						lock.Unlock()
+					}
 				}
-			}
+			}(idArr)
 			idArr = []string{}
 		}
 
 	}
+	wait.Wait()
 	return qutil.If(count > 0, count, -2).(int64)
 }