apple 4 سال پیش
والد
کامیت
8c67f4c318

+ 1 - 1
udpfusion/src/config.json

@@ -3,7 +3,7 @@
   "mongodb": {
     "addrName": "192.168.3.207:27092",
     "dbName": "zhengkun",
-    "collName": "test",
+    "collName": "zk_data",
     "pool": 10,
     "site": {
       "site_dbname": "qfw",

+ 61 - 40
udpfusion/src/main.go

@@ -10,6 +10,7 @@ import (
 	"qfw/common/src/qfw/util"
 	qu "qfw/util"
 	"strconv"
+	"sync"
 	"time"
 )
 
@@ -78,13 +79,9 @@ func mainT() {
 
 //快速测试使用
 func main() {
-	// 602685a6f021652bdea41e37  602686f5f021652bdea41ea1
-	//sid := "602685a6f021652bdea41e30"
-	//eid := "602685a6f021652bdea41e37"
-
-	sid := "602685a6f021652bdea41e39"
-	eid := "602686f5f021652bdea41ea1"
 
+	sid := "100000000000000000000000"
+	eid := "900000000000000000000000"
 	//log.Println(sid, "---", eid)
 	mapinfo := map[string]interface{}{}
 	if sid == "" || eid == "" {
@@ -123,7 +120,8 @@ func startTask(data []byte, mapInfo map[string]interface{}) {
 	//编译不同的融合组,如何划分组
 	/***********************/
 	/***********************/
-	/***********************/
+	/***y
+	********************/
 	/***********************/
 	fusionDataGroupArr := make([][]string,0) 			//待融合组
 	addOrUpdateArr := make([]bool,0) 					//新增-bool-记录-组新增,组更新
@@ -203,47 +201,70 @@ func startTask(data []byte, mapInfo map[string]interface{}) {
 	log.Println("开始处理新增分组... ...")
 	start := int(time.Now().Unix())
 	//进行分组融合
+	pool := make(chan bool, 1)
+	wg := &sync.WaitGroup{}
+
 	for i:=0;i<len(fusionDataGroupArr);i++ {
 		fusionArr := fusionDataGroupArr[i]
 		//构建数据
-		log.Println("构建第",i+1,"组数据...",fusionArr)
-		weight :=NewWeightData(fusionArr)
-		////整理数据-筛选排名,模板
-		weight.analyzeBuildStandardData()
-		if len(fusionArr)<=1 {
-			log.Println("单组生成... ...")
-			saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
-			saveid:=mgo.Save(fusion_coll_name,saveFusionData)
-			saveRecordData["_id"] = saveid
-			log.Println(saveRecordData["_id"])
-		 	mgo.Save(record_coll_name,saveRecordData)
-		}else {
-			if addOrUpdateArr[i] {
-				log.Println("多组更新... ...")
-				tmpdata:=infoFusionArr[i]
-				updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
-
-				UpdateFusion.updatePool <- []map[string]interface{}{
-					map[string]interface{}{
-						"_id": tmpdata["_id"],
-					},
-					updateFusionData,
-				}
-				UpdateRecord.updatePool <- []map[string]interface{}{
-					map[string]interface{}{
-						"_id": tmpdata["_id"],
-					},
-					updateRecordData,
-				}
-			}else {
-				log.Println("多组生成... ...")
-				saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
+		log.Println("构建第",i+1,"组数据...","数量:",len(fusionArr),fusionArr)
+		//多线程 - 处理数据
+
+		pool <- true
+		wg.Add(1)
+		go func(fusionArr []string,i int) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+
+
+			weight :=NewWeightData(fusionArr)
+			////整理数据-筛选排名,模板
+			weight.analyzeBuildStandardData()
+
+			if len(fusionArr)<=1 {
+				//log.Println("单组生成... ...")
+				saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
 				saveid:=mgo.Save(fusion_coll_name,saveFusionData)
 				saveRecordData["_id"] = saveid
 				mgo.Save(record_coll_name,saveRecordData)
+			}else {
+				if addOrUpdateArr[i] {
+					//log.Println("多组更新... ...")
+					tmpdata:=infoFusionArr[i]
+					updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
+
+					UpdateFusion.updatePool <- []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmpdata["_id"],
+						},
+						updateFusionData,
+					}
+					UpdateRecord.updatePool <- []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmpdata["_id"],
+						},
+						updateRecordData,
+					}
+				}else {
+					//log.Println("多组生成... ...")
+					saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
+					saveid:=mgo.Save(fusion_coll_name,saveFusionData)
+					saveRecordData["_id"] = saveid
+					mgo.Save(record_coll_name,saveRecordData)
+				}
 			}
-		}
+
+
+		}(fusionArr,i)
+
+
+
 	}
+
+	wg.Wait()
+
 	log.Println("新增融合over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
 
 	time.Sleep(30 * time.Second)

+ 1 - 1
udpfusion/src/updateFusion.go

@@ -20,7 +20,7 @@ type updateFusionInfo struct {
 var sp_f = make(chan bool, 5)
 
 func newUpdateFusionPool() *updateFusionInfo {
-	update:=&updateFusionInfo{make(chan []map[string]interface{}, 5000),100}
+	update:=&updateFusionInfo{make(chan []map[string]interface{}, 50000),100}
 	return update
 }
 

+ 6 - 7
udpfusion/src/weightFusion.go

@@ -2,10 +2,9 @@ package main
 
 import (
 	"fmt"
-	"log"
+	"go.mongodb.org/mongo-driver/bson/primitive"
 	qu "qfw/util"
 	"time"
-	"go.mongodb.org/mongo-driver/bson/primitive"
 )
 
 //处理融合数据-返回,融合新数据数据-新增
@@ -90,7 +89,7 @@ func (weight *weightDataMap) dealWithMultipleAddFusionStruct ()(map[string]inter
 		"data":recordDict,
 		"snapshot":map[string]interface{}{},
 	}
-	recordDict["number"] = qu.Int64All(1)
+	newRecordDict["number"] = qu.Int64All(1)
 
 	//返回,更新数据,日志记录数据
 	return dict,newRecordDict
@@ -224,7 +223,7 @@ func (weight *weightDataMap)dealWithOtherFieldData(recordDict *map[string]interf
 						continue
 					}
 					if judgeIsEffectiveData(newValue,key) {
-						log.Println("最大化有效-",key)
+						//log.Println("最大化有效-",key)
 						templateTmp[key] = newValue
 						modifyData[key] = newValue
 						(*recordDict)[key] = map[string]interface{}{
@@ -278,7 +277,7 @@ func (weight *weightDataMap)dealWithStructData(recordDict *map[string]interface{
 	if tmp_arr,b := templateTmp["attach_text"].(map[string]interface{});b {
 		//有值符合-
 		attach_text = tmp_arr
-		log.Println("默认初始:",attach_text)
+		//log.Println("默认初始:",attach_text)
 	}
 	//附件判重-并合并新增
 	keyIndex := -1
@@ -299,13 +298,13 @@ func (weight *weightDataMap)dealWithStructData(recordDict *map[string]interface{
 			if len(attachData)>0  { //有值
 				for _,v:=range attachData { //子元素
 					if attach,isOK := v.(map[string]interface{});isOK {
-						log.Println(attach)
+						//log.Println(attach)
 						if !dealWithRepeatAttachData(attach_text,attach) {
 							//符合条件-不重复直接添加
 							keyIndex++
 							saveKey := fmt.Sprintf("%v",keyIndex)
 							attach_text[saveKey] = attach //key累加
-							log.Println(attach_text)
+							//log.Println(attach_text)
 							isAttach = true
 
 							//多条情况-融合

+ 5 - 0
udpfusion/src/weightFusionMethod.go

@@ -131,6 +131,11 @@ func judgeIsEffectiveData(value interface{},key string) bool  {
 		return true
 	}
 
+	if valueType==reflect.Bool {
+		return true
+	}
+
+
 	//其他类型采用
 	valueLen := reflect.ValueOf(value).Len()
 	if valueLen>0 {

+ 6 - 30
udpfusion/src/weightValue.go

@@ -1,9 +1,5 @@
 package main
 
-import (
-	"sync"
-)
-
 type weightInfo struct {
 	maxLevel 		bool
 	minLevel		bool
@@ -15,7 +11,6 @@ type weightInfo struct {
 
 
 type weightDataMap struct {
-	lock   sync.Mutex //锁
 	data   map[string]*weightInfo
 	allids    []string
 	saveids    []string
@@ -25,7 +20,7 @@ type weightDataMap struct {
 func NewWeightData(arr []string) *weightDataMap {
 	//测试-默认第一个
 
-	weight := &weightDataMap{sync.Mutex{},map[string]*weightInfo{},[]string{},[]string{},""}
+	weight := &weightDataMap{map[string]*weightInfo{},[]string{},[]string{},""}
 
 	data := make(map[string]*weightInfo,0)
 	for _,v:=range arr {
@@ -35,25 +30,6 @@ func NewWeightData(arr []string) *weightDataMap {
 		}
 	}
 
-	//测试模拟分数
-	//weight := &weightDataMap{sync.Mutex{},map[string]*weightInfo{},[]string{},[]string{},templateid}
-	//data := make(map[string]*weightInfo,0)
-	//max :=[]bool{false,false,false,false,false,false,false,false,false,false}
-	//min :=[]bool{false,false,false,false,false,false,false,false,false,false}
-	//site :=[]int{2,1,5,3,4,2,3,5,1,0}
-	//qua :=[]int{15,11,11,11,22,19,22,44,22,66}
-	//rank :=[]int{-1,-1,-1,-1,-1,-1,-1,-1,-1,-1}
-	//for k,v:=range arr {
-	//	data[v] = &weightInfo{
-	//		max[k],
-	//		min[k],
-	//		site[k],
-	//		qua[k],
-	//		rank[k],
-	//	}
-	//}
-
-
 	weight.data = data
 
 	return weight
@@ -74,9 +50,9 @@ func analyzeTheSoureData(tmp map[string]interface{}) *weightInfo {
 
 
 	//测试 指定模板-数据-最高权重
-	if BsonTOStringId(tmp["_id"])=="602686f5f021652bdea41ea1" {
-		maxLevel = true
-	}
+	//if BsonTOStringId(tmp["_id"])=="602686f5f021652bdea41ea1" {
+	//	maxLevel = true
+	//}
 
 	delete(tmp,"_id")
 
@@ -93,7 +69,7 @@ func analyzeTheSoureData(tmp map[string]interface{}) *weightInfo {
 //分析模板数据-打标记构建数据结构
 func (weight *weightDataMap) analyzeBuildStandardData() {
 
-	weight.lock.Lock()
+
 	//分析里面的打分,以及是否参与融合来决定
 	data:=weight.data
 	//先构建
@@ -191,6 +167,6 @@ func (weight *weightDataMap) analyzeBuildStandardData() {
 	weight.allids = arrAllIds
 	weight.saveids = arrSaveIds
 
-	weight.lock.Unlock()
+
 }