Explorar o código

fix:数据导出优化

duxin hai 6 meses
pai
achega
af88ecabd3
Modificáronse 1 ficheiros con 79 adicións e 40 borrados
  1. 79 40
      common/src/qfw/util/dataexport/dataexport.go

+ 79 - 40
common/src/qfw/util/dataexport/dataexport.go

@@ -105,12 +105,12 @@ const (
 )
 
 var topType = map[string]string{
-	"招标预告":     "预告",
-	"招标公告":     "招标",
-	"招标结果":     "结果",
+	"招标预告":   "预告",
+	"招标公告":   "招标",
+	"招标结果":   "结果",
 	"招标信用信息": "其它",
-	"拟建项目":     "拟建",
-	"采购意向":     "采购意向",
+	"拟建项目":   "拟建",
+	"采购意向":   "采购意向",
 }
 
 // 包含正文或 附件 不包含标题
@@ -1103,6 +1103,7 @@ func FormatExportData(entmg mg.MongodbSim, data *[]map[string]interface{}, webdo
 	projectsetMap := make(map[string]bool)
 	projectsetEsMap := make(map[string]*map[string]interface{})
 	if dataType == "2" {
+		wait := &sync.WaitGroup{}
 		for _, m := range *data {
 			entidlist, ok := m["entidlist"].([]interface{})
 			if ok && len(entidlist) > 0 {
@@ -1120,52 +1121,90 @@ func FormatExportData(entmg mg.MongodbSim, data *[]map[string]interface{}, webdo
 				projectsetMap[qutil.InterfaceToStr(m["_id"])] = true
 			}
 		}
+		log.Println(fmt.Sprintf("查询企业:%d,查询项目:%d", len(qyxyMap), len(projectsetMap)))
 		if len(qyxyMap) > 0 { //查询企业
-			var (
-				qyxyArr []string
-				qCount  int
-			)
-			for s := range qyxyMap {
-				qCount++
-				qyxyArr = append(qyxyArr, fmt.Sprintf(`%s`, s))
-				if len(qyxyArr) == 100 || (qCount == len(qyxyMap) && len(qyxyArr) > 0) {
-					qyxyEsData := elastic.Get("qyxy", "qyxy", fmt.Sprintf(`{"query":{"bool":{"should":[{"terms":{"id":["%s"]}}]}},"size":%d,"_source":["company_name","company_email","company_phone","legal_person","id"]}`, strings.Join(qyxyArr, `","`), len(qyxyArr)))
-					if qyxyEsData != nil && len(*qyxyEsData) > 0 {
-						for _, m := range *qyxyEsData {
-							qyxyEsMap[qutil.InterfaceToStr(m["id"])] = &m
-						}
+			wait.Add(1)
+			go func() {
+				var (
+					qyxyArr []string
+					qCount  int
+				)
+				defer wait.Done()
+				pool := make(chan bool, 5)
+				qyxyWait := &sync.WaitGroup{}
+				lock := &sync.Mutex{}
+				for s := range qyxyMap {
+					qCount++
+					qyxyArr = append(qyxyArr, fmt.Sprintf(`%s`, s))
+					if len(qyxyArr) == 200 || (qCount == len(qyxyMap) && len(qyxyArr) > 0) {
+						pool <- true
+						qyxyWait.Add(1)
+						go func(qArr []string) {
+							defer func() {
+								<-pool
+								qyxyWait.Done()
+							}()
+							qyxyEsData := elastic.Get("qyxy", "qyxy", fmt.Sprintf(`{"query":{"bool":{"should":[{"terms":{"id":["%s"]}}]}},"size":%d,"_source":["company_name","company_email","company_phone","legal_person","id"]}`, strings.Join(qArr, `","`), len(qArr)))
+							if qyxyEsData != nil && len(*qyxyEsData) > 0 {
+								for _, m := range *qyxyEsData {
+									lock.Lock()
+									qyxyEsMap[qutil.InterfaceToStr(m["id"])] = &m
+									lock.Unlock()
+								}
+							}
+						}(qyxyArr)
+						qyxyArr = []string{}
 					}
-					qyxyArr = []string{}
 				}
-			}
+				qyxyWait.Wait()
+			}()
 		}
 		if len(projectsetMap) > 0 { //查询项目
-			var (
-				projectsetArr []string
-				pCount        int
-			)
-			for s := range projectsetMap {
-				pCount++
-				projectsetArr = append(projectsetArr, fmt.Sprintf(`%s`, s))
-				if len(projectsetArr) == 100 || (pCount == len(projectsetMap) && len(projectsetArr) > 0) {
-					projectsetEsData := elastic.Get("projectset", "projectset", fmt.Sprintf(`{"query": {"bool": {"should": [{"terms": {"list.infoid": ["%s"]}}]}},"_source": ["list"],"size": %d}`, strings.Join(projectsetArr, `","`), len(projectsetArr)*3)) //查询双倍数量 避免缺失数据
-					if projectsetEsData != nil && len(*projectsetEsData) > 0 {
-						for _, m := range *projectsetEsData {
-							MsgList := m["list"]
-							if MsgList != nil {
-								list := qutil.ObjArrToMapArr(MsgList.([]interface{}))
-								for _, m2 := range list {
-									if projectsetMap[qutil.InterfaceToStr(m2["infoid"])] {
-										projectsetEsMap[qutil.InterfaceToStr(m2["infoid"])] = &m
+			wait.Add(1)
+			go func() {
+				defer wait.Done()
+				var (
+					projectsetArr []string
+					pCount        int
+				)
+				pool := make(chan bool, 5)
+				projectsetWait := &sync.WaitGroup{}
+				lock := &sync.Mutex{}
+				for s := range projectsetMap {
+					pCount++
+					projectsetArr = append(projectsetArr, fmt.Sprintf(`%s`, s))
+					if len(projectsetArr) == 200 || (pCount == len(projectsetMap) && len(projectsetArr) > 0) {
+						pool <- true
+						projectsetWait.Add(1)
+						go func(pArr []string) {
+							defer func() {
+								<-pool
+								projectsetWait.Done()
+							}()
+							projectsetEsData := elastic.Get("projectset", "projectset", fmt.Sprintf(`{"query": {"bool": {"should": [{"terms": {"list.infoid": ["%s"]}}]}},"_source": ["list"],"size": %d}`, strings.Join(pArr, `","`), len(pArr)*2)) //查询双倍数量 避免缺失数据
+							if projectsetEsData != nil && len(*projectsetEsData) > 0 {
+								for _, m := range *projectsetEsData {
+									MsgList := m["list"]
+									if MsgList != nil {
+										list := qutil.ObjArrToMapArr(MsgList.([]interface{}))
+										for _, m2 := range list {
+											if projectsetMap[qutil.InterfaceToStr(m2["infoid"])] {
+												lock.Lock()
+												projectsetEsMap[qutil.InterfaceToStr(m2["infoid"])] = &m
+												lock.Unlock()
+											}
+										}
 									}
 								}
 							}
-						}
+						}(projectsetArr)
+						projectsetArr = []string{}
 					}
-					projectsetArr = []string{}
 				}
-			}
+				projectsetWait.Wait()
+			}()
 		}
+		wait.Wait()
 	}
 
 	var entCacheMap = map[string]map[string]interface{}{}