Explorar el Código

Merge branch 'master' into feature/v1.0.12

lianbingjie hace 1 año
padre
commit
b8808c0314

+ 152 - 113
common/src/qfw/util/dataexport/dataexport.go

@@ -628,23 +628,30 @@ func GetDataExportSelectReallyCountFromEs(ids []string) int64 {
 	wait := &sync.WaitGroup{}
 	var total int64
 	var lock sync.Mutex
-	for _, v := range SplitArray(ids, 200) {
-		pool <- true
-		wait.Add(1)
-		go func(arr []string) {
-			defer func() {
-				wait.Done()
-				<-pool
-			}()
-			query := fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"id":["%s"]}}]}}}`, strings.Join(arr, "\",\""))
-			tCount := elastic.Count(INDEX, TYPE, query)
-			if tCount > 0 {
-				lock.Lock()
-				total += tCount
-				lock.Unlock()
-			}
-			return
-		}(v)
+	var idArr []string
+	for i, id := range ids {
+		idArr = append(idArr, id)
+		if len(idArr) == 200 || i+1 == len(ids) {
+			pool <- true
+			wait.Add(1)
+			go func(arr []string) {
+				defer func() {
+					wait.Done()
+					<-pool
+				}()
+				log.Println("GetDataExportSelectReallyCountFromEs===", arr[0])
+				query := fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"id":["%s"]}}]}}}`, strings.Join(arr, "\",\""))
+				tCount := elastic.Count(INDEX, TYPE, query)
+				if tCount > 0 {
+					lock.Lock()
+					total += tCount
+					lock.Unlock()
+				}
+				return
+			}(idArr)
+			idArr = []string{}
+		}
+
 	}
 	wait.Wait()
 	log.Printf("GetDataExportSelectReallyCount 选择数据共%d条记录,实际查询%d条\n", len(ids), total)
@@ -665,44 +672,50 @@ func GetDataExportSelectReallyCountFromMongo(bid mg.MongodbSim, biddingName stri
 	)
 	pool := make(chan bool, 10)
 	wait := &sync.WaitGroup{}
-
-	for _, i2 := range SplitArray(ids, 200) {
-		pool <- true
-		wait.Add(1)
-		go func(arr []string) {
-			defer func() {
-				wait.Done()
-				<-pool
-			}()
-			lenNum := int64(len(arr))
-			var (
-				queryIds   []interface{}
-				num1, num2 int64
-				err        error
-			)
-			for _, idStr := range arr {
-				queryIds = append(queryIds, mg.StringTOBsonId(idStr))
-			}
-			num1, err = sess.DB(biddingName).C("bidding").Find(map[string]interface{}{"_id": map[string]interface{}{
-				"$in": queryIds,
-			}}).Count()
-			if err == nil {
-				if num1 == lenNum {
-					lock.Lock()
-					count += num1
-					lock.Unlock()
-					return
+	var idArr []string
+	for i, id := range ids {
+		idArr = append(idArr, id)
+		if len(idArr) == 200 || i+1 == len(ids) {
+			pool <- true
+			wait.Add(1)
+			go func(arr []string) {
+				defer func() {
+					wait.Done()
+					<-pool
+				}()
+				log.Println("GetDataExportSelectReallyCountFromMongo===", arr[0])
+				lenNum := int64(len(arr))
+				var (
+					queryIds   []interface{}
+					num1, num2 int64
+					err        error
+				)
+				for _, idStr := range arr {
+					queryIds = append(queryIds, mg.StringTOBsonId(idStr))
 				}
-				num2, err = sess.DB(biddingName).C("bidding_back").Find(map[string]interface{}{"_id": map[string]interface{}{
+				num1, err = sess.DB(biddingName).C("bidding").Find(map[string]interface{}{"_id": map[string]interface{}{
 					"$in": queryIds,
 				}}).Count()
 				if err == nil {
-					lock.Lock()
-					count += qutil.If(num2+num1 >= lenNum, lenNum, num2+num1).(int64)
-					lock.Unlock()
+					if num1 == lenNum {
+						lock.Lock()
+						count += num1
+						lock.Unlock()
+						return
+					}
+					num2, err = sess.DB(biddingName).C("bidding_back").Find(map[string]interface{}{"_id": map[string]interface{}{
+						"$in": queryIds,
+					}}).Count()
+					if err == nil {
+						lock.Lock()
+						count += qutil.If(num2+num1 >= lenNum, lenNum, num2+num1).(int64)
+						lock.Unlock()
+					}
 				}
-			}
-		}(i2)
+			}(idArr)
+			idArr = []string{}
+		}
+
 	}
 	wait.Wait()
 	return qutil.If(count > 0, count, -2).(int64)
@@ -720,31 +733,37 @@ func GetDataExportSelectResultFromEs(bidding mg.MongodbSim, biddingName string,
 	wait := &sync.WaitGroup{}
 	var lock sync.Mutex
 	returnLsit := make([]map[string]interface{}, 0, len(scd.SelectIds))
-	for _, v := range SplitArray(scd.SelectIds, 200) {
-		pool <- true
-		wait.Add(1)
-		go func(arr []string) error {
-			defer func() {
-				wait.Done()
-				<-pool
-			}()
-			query := fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"id":["%s"]}}]}},"_source": [%s],"size":%d}`, strings.Join(arr, "\",\""), bidField, len(arr))
-			log.Println("GetDataExportSelectResultFromEs 数据流量包 es count 信息查询:", query)
-			data := *elastic.Get(INDEX, TYPE, query)
-			if data != nil && len(data) > 0 {
-				for _, bv := range data {
-					bv["_id"] = mg.BsonIdToSId(bv["_id"])
-					detail, _ := bv["detail"].(string)
-					if detail != "" {
-						bv["detail"] = contentfilterReg.ReplaceAllString(detail, "")
+	var idArr []string
+	for i, id := range scd.SelectIds {
+		idArr = append(idArr, id)
+		if len(idArr) == 200 || i+1 == len(scd.SelectIds) {
+			pool <- true
+			wait.Add(1)
+			go func(arr []string) error {
+				defer func() {
+					wait.Done()
+					<-pool
+				}()
+				log.Println(scd.Id, "GetDataExportSelectResultFromEs===", arr[0])
+				query := fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"id":["%s"]}}]}},"_source": [%s],"size":%d}`, strings.Join(arr, "\",\""), bidField, len(arr))
+				log.Println("GetDataExportSelectResultFromEs 数据流量包 es count 信息查询:", query)
+				data := *elastic.Get(INDEX, TYPE, query)
+				if data != nil && len(data) > 0 {
+					for _, bv := range data {
+						bv["_id"] = mg.BsonIdToSId(bv["_id"])
+						detail, _ := bv["detail"].(string)
+						if detail != "" {
+							bv["detail"] = contentfilterReg.ReplaceAllString(detail, "")
+						}
+						lock.Lock()
+						returnLsit = append(returnLsit, bv)
+						lock.Unlock()
 					}
-					lock.Lock()
-					returnLsit = append(returnLsit, bv)
-					lock.Unlock()
 				}
-			}
-			return nil
-		}(v)
+				return nil
+			}(idArr)
+			idArr = []string{}
+		}
 	}
 	wait.Wait()
 	if len(returnLsit) == checkCount || checkCount == -1 {
@@ -772,48 +791,60 @@ func GetDataExportSelectResultFromMongoDb(bidding mg.MongodbSim, biddingName str
 	wait := &sync.WaitGroup{}
 	var lock sync.Mutex
 	returnLsit := make([]map[string]interface{}, 0, len(scd.SelectIds))
-	for _, v := range SplitArray(scd.SelectIds, 200) {
-		pool <- true
-		wait.Add(1)
-		go func(arr []string) error {
-			defer func() {
-				wait.Done()
-				<-pool
-			}()
-			var queryIds []interface{}
-			for _, idStr := range arr {
-				queryIds = append(queryIds, mg.StringTOBsonId(idStr))
-			}
-			iter := sess.DB(biddingName).C("bidding").Select(selectMap).Find(map[string]interface{}{"_id": map[string]interface{}{
-				"$in": queryIds,
-			}}).Iter()
-			for m := make(map[string]interface{}); iter.Next(&m); {
-				m["_id"] = mg.BsonIdToSId(m["_id"])
-				detail, _ := m["detail"].(string)
-				if detail != "" {
-					m["detail"] = contentfilterReg.ReplaceAllString(detail, "")
+	var idArr []string
+	for i, id := range scd.SelectIds {
+		idArr = append(idArr, id)
+		if len(idArr) == 200 || i+1 == len(scd.SelectIds) {
+			pool <- true
+			wait.Add(1)
+			go func(arr []string) error {
+				defer func() {
+					wait.Done()
+					<-pool
+				}()
+				log.Println(scd.Id, "GetDataExportSelectResultFromMongoDb===", arr[0])
+				var (
+					queryIds []interface{}
+					count    int
+				)
+				for _, idStr := range arr {
+					queryIds = append(queryIds, mg.StringTOBsonId(idStr))
 				}
-				lock.Lock()
-				returnLsit = append(returnLsit, m)
-				lock.Unlock()
-				m = make(map[string]interface{})
-			}
-			iter_back := sess.DB(biddingName).C("bidding_back").Select(selectMap).Find(map[string]interface{}{"_id": map[string]interface{}{
-				"$in": queryIds,
-			}}).Iter()
-			for m := make(map[string]interface{}); iter_back.Next(&m); {
-				m["_id"] = mg.BsonIdToSId(m["_id"])
-				detail, _ := m["detail"].(string)
-				if detail != "" {
-					m["detail"] = contentfilterReg.ReplaceAllString(detail, "")
+				iter := sess.DB(biddingName).C("bidding").Select(selectMap).Find(map[string]interface{}{"_id": map[string]interface{}{
+					"$in": queryIds,
+				}}).Iter()
+				for m := make(map[string]interface{}); iter.Next(&m); {
+					m["_id"] = mg.BsonIdToSId(m["_id"])
+					detail, _ := m["detail"].(string)
+					if detail != "" {
+						m["detail"] = contentfilterReg.ReplaceAllString(detail, "")
+					}
+					count++
+					lock.Lock()
+					returnLsit = append(returnLsit, m)
+					lock.Unlock()
+					m = make(map[string]interface{})
 				}
-				lock.Lock()
-				returnLsit = append(returnLsit, m)
-				lock.Unlock()
-				m = make(map[string]interface{})
-			}
-			return nil
-		}(v)
+				if count != len(arr) {
+					iter_back := sess.DB(biddingName).C("bidding_back").Select(selectMap).Find(map[string]interface{}{"_id": map[string]interface{}{
+						"$in": queryIds,
+					}}).Iter()
+					for m := make(map[string]interface{}); iter_back.Next(&m); {
+						m["_id"] = mg.BsonIdToSId(m["_id"])
+						detail, _ := m["detail"].(string)
+						if detail != "" {
+							m["detail"] = contentfilterReg.ReplaceAllString(detail, "")
+						}
+						lock.Lock()
+						returnLsit = append(returnLsit, m)
+						lock.Unlock()
+						m = make(map[string]interface{})
+					}
+				}
+				return nil
+			}(idArr)
+			idArr = []string{}
+		}
 	}
 	wait.Wait()
 	if len(returnLsit) == checkCount || checkCount == -1 {
@@ -825,6 +856,9 @@ func GetDataExportSelectResultFromMongoDb(bidding mg.MongodbSim, biddingName str
 
 // SplitArray 分割数组
 func SplitArray(arr []string, num int64) [][]string {
+	if len(arr) == 0 {
+		return nil
+	}
 	max := int64(len(arr))
 	//判断数组大小是否小于等于指定分割大小的值,是则把原数组放入二维数组返回
 	if max <= num {
@@ -1045,6 +1079,11 @@ func FormatExportData(entmg mg.MongodbSim, data *[]map[string]interface{}, webdo
 	if len(encry) > 0 {
 		isEncry = true
 	}
+	sort.Slice(*data, func(i, j int) bool {
+		time1 := qutil.Int64All((*data)[i]["publishtime"])
+		time2 := qutil.Int64All((*data)[j]["publishtime"])
+		return time1 > time2
+	})
 	var entCacheMap = map[string]map[string]interface{}{}
 	for index := 0; index < len(*data); index++ {
 		v := (*data)[index]

+ 6 - 0
common/src/qfw/util/dataexport/entdataexport.go

@@ -7,6 +7,7 @@ import (
 	"log"
 	"net/http"
 	"os"
+	"sort"
 	"strconv"
 	"strings"
 	"sync"
@@ -148,6 +149,11 @@ func FormatExportDatas(Mgo_Ent mongodb.MongodbSim, data *[]map[string]interface{
 		entexportPool      = make(chan bool, 20)
 		entexportWaitGroup = &sync.WaitGroup{}
 	)
+	sort.Slice(*data, func(i, j int) bool {
+		time1 := util.Int64All((*data)[i]["publishtime"])
+		time2 := util.Int64All((*data)[j]["publishtime"])
+		return time1 > time2
+	})
 	log.Println("补充信息开始")
 	for _, v := range *data {
 		entexportWaitGroup.Add(1)