Эх сурвалжийг харах

wip:数据导出统一查询es,弃用查询mongodb

wangkaiyue 1 жил өмнө
parent
commit
1b088a303a

+ 32 - 66
common/src/qfw/util/dataexport/dataexport.go

@@ -510,12 +510,12 @@ func GetSqlObjFromId(mongo mg.MongodbSim, _id string) *SieveCondition {
 }
 
 // 数据导出-查询结果数量
-func GetDataExportSearchCountByScdId(sim, bid mg.MongodbSim, biddingName, elasticAddress, id string) (count int) {
+func GetDataExportSearchCountByScdId(sim mg.MongodbSim, elasticAddress, id string) (count int) {
 	scd := GetSqlObjFromId(sim, id) //用户筛选条件
 	if scd.SelectIds != nil {
 		//部分数据可能已删除、不存在;此处需要统计返回实际数量
 		//return len(scd.SelectIds)
-		return int(GetDataExportSelectReallyCount(bid, biddingName, scd.SelectIds))
+		return int(GetDataExportSelectReallyCount(scd.SelectIds))
 	}
 	return GetDataExportSearchCountBySieveCondition(scd, elasticAddress)
 }
@@ -600,9 +600,9 @@ func isNullSearch(scd *SieveCondition) (isNull bool) {
  * count 返回数量 (-1:预览数据查询)
  */
 
-func GetDataExportSearchResultByScdId(sim, bid mg.MongodbSim, bidMgoDBName, elasticAddress, id, dataType string, checkCount int) (*[]map[string]interface{}, error) {
+func GetDataExportSearchResultByScdId(sim mg.MongodbSim, elasticAddress, id, dataType string, checkCount int) (*[]map[string]interface{}, error) {
 	scd := GetSqlObjFromId(sim, id)
-	list, err := GetDataExportSearchResult(bid, bidMgoDBName, elasticAddress, scd, dataType, checkCount)
+	list, err := GetDataExportSearchResult(elasticAddress, scd, dataType, checkCount)
 	if list == nil || err != nil {
 		return nil, err
 	}
@@ -619,41 +619,35 @@ func GetDataExportIdArrByScdId(sim mg.MongodbSim, elasticAddress, id string, che
 var contentfilterReg = regexp.MustCompile("<[^>]+>")
 
 // GetDataExportSelectReallyCount 查询实际可调导出数量
-func GetDataExportSelectReallyCount(bid mg.MongodbSim, biddingName string, ids []string) int64 {
-	sess := bid.GetMgoConn()
-	defer bid.DestoryMongoConn(sess)
-	if ids == nil || len(ids) == 0 {
-		return 0
-	}
-	var queryIds []interface{}
-	for _, idStr := range ids {
-		queryIds = append(queryIds, mg.StringTOBsonId(idStr))
-	}
-	lenNum := int64(len(ids))
-	num1, err1 := sess.DB(biddingName).C("bidding").Find(map[string]interface{}{"_id": map[string]interface{}{
-		"$in": queryIds,
-	}}).Count()
-	if err1 == nil {
-		if num1 == lenNum {
-			return lenNum
-		}
-		num2, err2 := sess.DB(biddingName).C("bidding_back").Find(map[string]interface{}{"_id": map[string]interface{}{
-			"$in": queryIds,
-		}}).Count()
-		if err2 == nil {
-			if num2+num1 == lenNum {
-				return lenNum
-			} else if num1+num2 > lenNum {
-				return lenNum
-			} else if num1+num2 < lenNum {
-				return num1 + num2
+func GetDataExportSelectReallyCount(ids []string) int64 {
+	pool := make(chan bool, 10)
+	wait := &sync.WaitGroup{}
+	var total int64
+	var lock sync.Mutex
+	for _, v := range SplitArray(ids, 200) {
+		pool <- true
+		wait.Add(1)
+		go func(arr []string) {
+			defer func() {
+				wait.Done()
+				<-pool
+			}()
+			query := fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"id":["%s"]}}]}}}`, strings.Join(arr, "\",\""))
+			tCount := elastic.Count(INDEX, TYPE, query)
+			if tCount > 0 {
+				lock.Lock()
+				total += tCount
+				lock.Unlock()
 			}
-		}
+			return
+		}(v)
 	}
-	return -2
+	wait.Wait()
+	log.Printf("GetDataExportSelectReallyCount 选择数据共%d条记录,实际查询%d条\n", len(ids), total)
+	return total
 }
 
-func GetDataExportSelectResult(bidding mg.MongodbSim, biddingName string, scd *SieveCondition, dataType string, checkCount int) (*[]map[string]interface{}, error) {
+func GetDataExportSelectResult(scd *SieveCondition, dataType string, checkCount int) (*[]map[string]interface{}, error) {
 	//sess := bidding.GetMgoConn()
 	//defer bidding.DestoryMongoConn(sess)
 	bidField := `"_id", "title", "detail", "area", "city", "publishtime", "projectname", "buyer", "s_winner", "bidamount", "subtype", "toptype", "filetext", "purchasing"`
@@ -700,34 +694,6 @@ func GetDataExportSelectResult(bidding mg.MongodbSim, biddingName string, scd *S
 					lock.Unlock()
 				}
 			}
-			//iter := sess.DB(biddingName).C("bidding").Select(selectMap).Find(map[string]interface{}{"_id": map[string]interface{}{
-			//	"$in": queryIds,
-			//}}).Iter()
-			//for m := make(map[string]interface{}); iter.Next(&m); {
-			//	m["_id"] = mg.BsonIdToSId(m["_id"])
-			//	detail, _ := m["detail"].(string)
-			//	if detail != "" {
-			//		m["detail"] = contentfilterReg.ReplaceAllString(detail, "")
-			//	}
-			//	lock.Lock()
-			//	returnLsit = append(returnLsit, m)
-			//	lock.Unlock()
-			//	m = make(map[string]interface{})
-			//}
-			//iter_back := sess.DB(biddingName).C("bidding_back").Select(selectMap).Find(map[string]interface{}{"_id": map[string]interface{}{
-			//	"$in": queryIds,
-			//}}).Iter()
-			//for m := make(map[string]interface{}); iter_back.Next(&m); {
-			//	m["_id"] = mg.BsonIdToSId(m["_id"])
-			//	detail, _ := m["detail"].(string)
-			//	if detail != "" {
-			//		m["detail"] = contentfilterReg.ReplaceAllString(detail, "")
-			//	}
-			//	lock.Lock()
-			//	returnLsit = append(returnLsit, m)
-			//	lock.Unlock()
-			//	m = make(map[string]interface{})
-			//}
 			return nil
 		}(v)
 	}
@@ -831,13 +797,13 @@ func GetDataExportIds(elasticAddress string, scd *SieveCondition, checkCount int
 // GetDataExportSearchResult 获取数据导出内容
 // entmg 高级字段包查询企业电话邮箱等字段
 // checkCount -1 预览500条
-func GetDataExportSearchResult(bid mg.MongodbSim, bidMgoDBName, elasticAddress string, scd *SieveCondition, dataType string, checkCount int) (*[]map[string]interface{}, error) {
+func GetDataExportSearchResult(elasticAddress string, scd *SieveCondition, dataType string, checkCount int) (*[]map[string]interface{}, error) {
 	defer qutil.Catch()
 	if scd == nil {
 		return nil, errors.New("GetDataExportSearchResult-未获取到查询信息")
 	}
 	if scd.SelectIds != nil {
-		idSelectDates, idSelectErr := GetDataExportSelectResult(bid, bidMgoDBName, scd, dataType, checkCount)
+		idSelectDates, idSelectErr := GetDataExportSelectResult(scd, dataType, checkCount)
 		if idSelectErr != nil {
 			return nil, idSelectErr
 		}
@@ -955,7 +921,7 @@ func doSearchByBatch(query, dataType string, searchCount int, flag string) (res
 	return
 }
 
-func FormatExportData(entmg mg.MongodbSim, data *[]map[string]interface{}, webdomain string, dataType string, encry ...bool) *[]map[string]interface{} {
+func FormatExportData(data *[]map[string]interface{}, webdomain string, dataType string, encry ...bool) *[]map[string]interface{} {
 	//格式化输出
 	isEncry := false
 	if len(encry) > 0 {

+ 3 - 3
common/src/qfw/util/dataexport/entdataexport.go

@@ -29,19 +29,19 @@ type Filters struct {
 	FilterId string
 }
 
-func GetEntDataExportCount(sim, bid mg.MongodbSim, bidMgoDBName, elasticAddress, _id string, entId, entUserId int, isFirst bool, url string, maxCount int) (count, newCount int, data *[]map[string]interface{}) {
+func GetEntDataExportCount(sim mg.MongodbSim, elasticAddress, _id string, entId, entUserId int, isFirst bool, url string, maxCount int) (count, newCount int, data *[]map[string]interface{}) {
 	defer util.Catch()
 	var (
 		searchsWaitGroup = &sync.WaitGroup{}
 	)
-	count = GetDataExportSearchCountByScdId(sim, bid, bidMgoDBName, elasticAddress, _id)
+	count = GetDataExportSearchCountByScdId(sim, elasticAddress, _id)
 	if count > maxCount || count == -1 {
 		count = maxCount
 	}
 	log.Println("count", count)
 	dataType := "2"
 	//数据导出数据查询
-	res, err := GetDataExportSearchResultByScdId(sim, bid, bidMgoDBName, elasticAddress, _id, dataType, count)
+	res, err := GetDataExportSearchResultByScdId(sim, elasticAddress, _id, dataType, count)
 	if err != nil {
 		log.Println("企业数据导出错误 ", err)
 		return 0, 0, nil