apple преди 4 години
родител
ревизия
35e73fefbd
променени са 3 файла, в които са добавени 35 реда и са изтрити 27 реда
  1. 32 23
      udpfusion/src/fusionFullData.go
  2. 1 1
      udpfusion/src/updateElastic.go
  3. 2 3
      udpfusion/src/updateRecord.go

+ 32 - 23
udpfusion/src/fusionFullData.go

@@ -83,34 +83,37 @@ func startFusionData()  {
 				if len(fusionArr) <= 1 { //单组数据
 					saveFusionData, saveRecordData := weight.dealWithAddFusionStruct()
 					saveid := mgo.Save(fusion_coll_name, saveFusionData)
-					//新增-Record
+					//新增-Record  批量新增-经测试-批量新增与多线程新增 速度306s-236s 相差20%的耗时
 					saveRecordData["_id"] = saveid
-					mgo.Save(record_coll_name,saveRecordData)
-					//UpdateRecord.add_pool <- saveRecordData  //批量新增
-
-					//批量更新Es
-					//fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
-					//updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
-					//updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
-					//UpdateElastic.update_pool <- map[string]string{
-					//	"id":sourceid,
-					//	"updateStr":updateStr1+updateStr2,
-					//}
-				}else { //多组数据
+					UpdateRecord.add_pool <- saveRecordData
+
+
+					//批量更新Es -问题耗时
+					fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
+					updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
+					updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
+					elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
+					UpdateElastic.update_pool <- map[string]string{
+						"id":sourceid,
+						"updateStr":updateStr1+updateStr2,
+					}
+
+				}else {
 					saveFusionData, saveRecordData := weight.dealWithMultipleAddFusionStruct()
 					saveid := mgo.Save(fusion_coll_name, saveFusionData)
 					//新增-Record
 					saveRecordData["_id"] = saveid
-					mgo.Save(record_coll_name,saveRecordData)
-					//UpdateRecord.add_pool <- saveRecordData //批量新增
-					//批量更新Es
-					//fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
-					//updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
-					//updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
-					//UpdateElastic.update_pool <- map[string]string{
-					//	"id":sourceid,
-					//	"updateStr":updateStr1+updateStr2,
-					//}
+					UpdateRecord.add_pool <- saveRecordData //批量新增
+
+					//批量更新Es -
+					fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
+					updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
+					updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
+					elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
+					UpdateElastic.update_pool <- map[string]string{
+						"id":sourceid,
+						"updateStr":updateStr1+updateStr2,
+					}
 
 				}
 			}(sourceid, fusionArr)
@@ -124,6 +127,12 @@ func startFusionData()  {
 
 }
 
+func goUpdateEs(sourceid string,updateStr string)  {
+	UpdateElastic.update_pool <- map[string]string{
+		"id":sourceid,
+		"updateStr":updateStr,
+	}
+}
 
 func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
 

+ 1 - 1
udpfusion/src/updateElastic.go

@@ -8,7 +8,7 @@ import (
 
 type updateEsInfo struct {
 
-	//更新或新增通道
+	//更新通道
 	update_pool chan map[string]string
 	//数量
 	saveSize   	int

+ 2 - 3
udpfusion/src/updateRecord.go

@@ -7,8 +7,9 @@ import (
 
 type updateRecordInfo struct {
 
-	//更新或新增通道
+	//新增通道
 	add_pool chan map[string]interface{}
+	//更新通道
 	update_pool chan []map[string]interface{}
 	//数量
 	saveSize   	int
@@ -51,7 +52,6 @@ func (update *updateRecordInfo) addRecordData() {
 					}()
 					//批量新增
 					mgo.SaveBulk(record_coll_name, dataArr...)
-
 				}(tmpArr)
 				tmpArr = make([]map[string]interface{}, update.saveSize)
 				tmpIndex = 0
@@ -65,7 +65,6 @@ func (update *updateRecordInfo) addRecordData() {
 						<-sp_r
 					}()
 					//批量新增
-
 					mgo.SaveBulk(record_coll_name, dataArr...)
 				}(tmpArr[:tmpIndex])
 				tmpArr = make([]map[string]interface{}, update.saveSize)