Przeglądaj źródła

融合记录相关功能

apple 4 lat temu
rodzic
commit
8dee5cdd4a

+ 1 - 0
udpfilterdup/src/dataMethodMerge.go

@@ -185,6 +185,7 @@ func mergeDataFieldsArr(source *Info, info *Info) (*Info, []int64, bool) {
 		mergeArr = append(mergeArr, 6)
 		is_replace = true
 	}
+
 	//7、中标金额
 	if source.bidamount == 0 && info.bidamount != 0 {
 		merge_recordMap["bidamount"] = source.bidamount

+ 1 - 1
udpfusion/src/config.json

@@ -3,7 +3,7 @@
   "mongodb": {
     "addrName": "192.168.3.207:27092",
     "dbName": "zhengkun",
-    "collName": "all_test",
+    "collName": "test",
     "pool": 10,
     "site": {
       "site_dbname": "qfw",

+ 133 - 64
udpfusion/src/main.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"encoding/json"
+	"go.mongodb.org/mongo-driver/bson/primitive"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -77,9 +78,13 @@ func mainT() {
 
 //快速测试使用
 func main() {
+	// 602685a6f021652bdea41e37  602686f5f021652bdea41ea1
+	//sid := "602685a6f021652bdea41e30"
+	//eid := "602685a6f021652bdea41e37"
+
+	sid := "602685a6f021652bdea41e39"
+	eid := "602686f5f021652bdea41ea1"
 
-	sid := "1f0000000000000000000000"
-	eid := "9f0000000000000000000000"
 	//log.Println(sid, "---", eid)
 	mapinfo := map[string]interface{}{}
 	if sid == "" || eid == "" {
@@ -101,11 +106,8 @@ func main() {
 //融合具体方法
 func startTask(data []byte, mapInfo map[string]interface{}) {
 
-	//遍历数据
 	log.Println("开始融合流程")
 
-
-
 	defer qu.Catch()
 	//区间id
 	q := map[string]interface{}{
@@ -119,8 +121,13 @@ func startTask(data []byte, mapInfo map[string]interface{}) {
 	defer mgo.DestoryMongoConn(sess)
 	it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
 	//编译不同的融合组,如何划分组
-	fusionDataGroupArr := make([][]string,0) //待融合组
-	addOrUpdateArr := make([]bool,0) //新增-bool-记录
+	/***********************/
+	/***********************/
+	/***********************/
+	/***********************/
+	fusionDataGroupArr := make([][]string,0) 			//待融合组
+	addOrUpdateArr := make([]bool,0) 					//新增-bool-记录-组新增,组更新
+	infoFusionArr := make([]map[string]interface{},0) 	//记录取融合表的数据
 
 	repeatArr,sourceArr,index := make([]string,0),make([]string,0),0 //重复数据组
 	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
@@ -136,14 +143,14 @@ func startTask(data []byte, mapInfo map[string]interface{}) {
 		}else {
 			fusionDataGroupArr = append(fusionDataGroupArr,[]string{tmpId})
 			addOrUpdateArr = append(addOrUpdateArr,false)
+			infoFusionArr = append(infoFusionArr, map[string]interface{}{})
 		}
 		tmp = make(map[string]interface{})
 	}
 
 	log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
-	log.Println("状态记录:",len(addOrUpdateArr))
+
 	//根据重复组,重新划分新的组别
-	num1,num2:=0,0
 	for i:=0;i<len(repeatArr);i++ {
 		sourceid := sourceArr[i]
 		isAddExist,index := false,0
@@ -163,92 +170,153 @@ func startTask(data []byte, mapInfo map[string]interface{}) {
 			arr = fusionDataGroupArr[index]
 			arr = append(arr,repeatArr[i])//组拼接当前id
 			fusionDataGroupArr[index] = arr
-			num1++
 		}else {//当前段落未找到-需要查询融合表,,遍历融合表
-			arr := make([]string,0)
-			arr = dealWithFindFusionDataArr(sourceid)
+			arr,fusionTmpData := make([]string,0),make(map[string]interface{},0)
+			arr,fusionTmpData = dealWithFindFusionDataArr(sourceid)
+
 			arr = append(arr,repeatArr[i])//组拼接当前id
-			if len(arr)<1 {
-				log.Println("数据异常,融合表找不到数据",repeatArr[i])
-			}else { //新增
-				log.Println("数据融合新增")
+			if len(arr)<1 { //异常错误,新增
+				log.Println("... ... 数据异常异常,融合表,当前组均找不到数据",repeatArr[i])
+				arr_error := make([]string,0)
+				arr_error = append(arr_error,repeatArr[i])//组拼接当前id
+				fusionDataGroupArr = append(fusionDataGroupArr,arr_error)
+				addOrUpdateArr = append(addOrUpdateArr,false)
+				infoFusionArr = append(infoFusionArr, map[string]interface{}{})
+			}else { //正常更新
 				fusionDataGroupArr = append(fusionDataGroupArr,arr)
 				addOrUpdateArr = append(addOrUpdateArr,true)
+				infoFusionArr = append(infoFusionArr,fusionTmpData)
 			}
-			num2++
 
 		}
 		//不断改变中
-		log.Println("当前分组数量:",len(fusionDataGroupArr))
+		//log.Println("当前分组数量:",len(fusionDataGroupArr))
 	}
+	log.Println("最终待融合分组数量:",len(fusionDataGroupArr))
+
 
-	log.Println("分组完毕:","重复新增数量:",num1,"重复更新数量:",num2,len(repeatArr))
-	log.Println("最终带融合分组:",len(fusionDataNewGroupArr))
-	//分组细节需要修改 - 带测试
-	return
 
 
+	log.Println("********************分割线********************")
+	log.Println("********************分割线********************")
+	log.Println("********************分割线********************")
 	log.Println("开始处理新增分组... ...")
 	start := int(time.Now().Unix())
 	//进行分组融合
-	for i:=0;i<len(fusionDataNewGroupArr);i++ {
-		fusionArr := fusionDataNewGroupArr[i]
+	for i:=0;i<len(fusionDataGroupArr);i++ {
+		fusionArr := fusionDataGroupArr[i]
 		//构建数据
-		log.Println("构建第组数据...",fusionArr)
+		log.Println("构建第",i+1,"组数据...",fusionArr)
 		weight :=NewWeightData(fusionArr)
-		//整理数据-筛选排名,模板
+		////整理数据-筛选排名,模板
 		weight.analyzeBuildStandardData()
 		if len(fusionArr)<=1 {
-			//更新数据(融合表)   日志数据(日志记录表)
-		 	//updateData,_ := weight.dealWithAddFusionStruct()
-			//log.Println("新增:更新数据",len(updateData))
-		 	//mgo.Save(fusion_coll_name,updateData) //新增
+			log.Println("单组生成... ...")
+			saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
+			saveid:=mgo.Save(fusion_coll_name,saveFusionData)
+			saveRecordData["_id"] = saveid
+			log.Println(saveRecordData["_id"])
+		 	mgo.Save(record_coll_name,saveRecordData)
 		}else {
-			//updateData,_ := weight.dealWithMultipleFusionStruct()
-			//log.Println("多组新增:更新数据",len(updateData))
-			//mgo.Save(fusion_coll_name,updateData)
+			if addOrUpdateArr[i] {
+				log.Println("多组更新... ...")
+				tmpdata:=infoFusionArr[i]
+				updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
+
+				UpdateFusion.updatePool <- []map[string]interface{}{
+					map[string]interface{}{
+						"_id": tmpdata["_id"],
+					},
+					updateFusionData,
+				}
+				UpdateRecord.updatePool <- []map[string]interface{}{
+					map[string]interface{}{
+						"_id": tmpdata["_id"],
+					},
+					updateRecordData,
+				}
+			}else {
+				log.Println("多组生成... ...")
+				saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
+				saveid:=mgo.Save(fusion_coll_name,saveFusionData)
+				saveRecordData["_id"] = saveid
+				mgo.Save(record_coll_name,saveRecordData)
+			}
 		}
 	}
-	log.Println("新增融合over :",len(fusionDataNewGroupArr),"用时:",int(time.Now().Unix())-start)
-
-
-	//多组-融合表更新
-	//UpdateFusion.updatePool <- []map[string]interface{}{//原始数据打标签
-	//	map[string]interface{}{},
-	//	updateData,
-	//}
-
+	log.Println("新增融合over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
 
 	time.Sleep(30 * time.Second)
-
 	//任务完成,开始发送广播通知下面节点
 	taskSendFusionUdp(mapInfo)
 
 }
 
 //查询融合表数据-找到对应组id
-func dealWithFindFusionDataArr(sourceid string) []string {
-	arr := make([]string,0)
+func dealWithFindFusionDataArr(sourceid string) ([]string,map[string]interface{}) {
+	newArr ,arr := make([]string,0),make(primitive.A,0)
+	tmpData:=make(map[string]interface{},0)
 	q := map[string]interface{}{}
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
-	it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
+	it := sess.DB(mgo.DbName).C(fusion_coll_name).Find(&q).Iter()
+
 	for tmp := make(map[string]interface{}); it.Next(&tmp); {
-		fusion_allids := tmp["fusion_allids"].([]string)
-		for _,v:=range fusion_allids {
-			if v==sourceid {
-				//找到目标组-
-				arr = fusion_allids
-				tmp = make(map[string]interface{})
-				break
+		//log.Println(reflect.TypeOf(tmp["fusion_allids"]))
+		if fusion_allids,b := tmp["fusion_allids"].(primitive.A);b {
+			for _,v:=range fusion_allids {
+				if v==sourceid {
+					//找到目标组-
+					arr = fusion_allids
+					tmpData = tmp
+					tmp = make(map[string]interface{})
+					break
+				}
 			}
 		}
+
 		tmp = make(map[string]interface{})
 	}
-	return arr
+
+	for _,v:=range  arr{
+		newArr = append(newArr,qu.ObjToString(v))
+	}
+
+	return newArr,tmpData
 }
 
+//查询记录1表数据-找到对应的id , 更新用到
+func dealWithFindRecordData(sourceid string) string {
+	newArr ,arr := make([]string,0),make(primitive.A,0)
+	//tmpData:=make(map[string]interface{},0)
+	q := map[string]interface{}{}
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	it := sess.DB(mgo.DbName).C(fusion_coll_name).Find(&q).Iter()
+
+	for tmp := make(map[string]interface{}); it.Next(&tmp); {
+		//log.Println(reflect.TypeOf(tmp["fusion_allids"]))
+		if fusion_allids,b := tmp["fusion_allids"].(primitive.A);b {
+			for _,v:=range fusion_allids {
+				if v==sourceid {
+					//找到目标组-
+					arr = fusion_allids
+					//tmpData = tmp
+					tmp = make(map[string]interface{})
+					break
+				}
+			}
+		}
+
+		tmp = make(map[string]interface{})
+	}
+
+	for _,v:=range  arr{
+		newArr = append(newArr,qu.ObjToString(v))
+	}
 
+	return ""
+}
 
 //udp 监听
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
@@ -282,7 +350,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 }
 
-
+//结束发送udp
 func taskSendFusionUdp(mapinfo map[string]interface{})  {
 
 	//log.Println("信息融合结束-发送udp")
@@ -306,14 +374,6 @@ func taskSendFusionUdp(mapinfo map[string]interface{})  {
 	}
 }
 
-
-
-
-
-
-
-
-
 //判断是否在当前id段落
 func judgeIsCurIds (gtid string,lteid string,curid string) bool {
 
@@ -324,4 +384,13 @@ func judgeIsCurIds (gtid string,lteid string,curid string) bool {
 		return true
 	}
 	return false
-}
+}
+
+
+
+
+
+
+
+
+

+ 1 - 1
udpfusion/src/updateFusion.go

@@ -26,7 +26,7 @@ func newUpdateFusionPool() *updateFusionInfo {
 
 
 func (update *updateFusionInfo) updateFusionData() {
-	log.Println("开始不断监听--待更新数据")
+	log.Println("监听--融合更新数据")
 	tmpArr := make([][]map[string]interface{}, update.saveSize)
 	tmpIndex := 0
 	for {

+ 1 - 1
udpfusion/src/updateRecord.go

@@ -26,7 +26,7 @@ func newUpdateRecordPool() *updateRecordInfo {
 
 
 func (update *updateRecordInfo) updateRecordData() {
-	log.Println("开始不断监听--待更新数据")
+	log.Println("监听日志--待更新数据")
 	tmpArr := make([][]map[string]interface{}, update.saveSize)
 	tmpIndex := 0
 	for {

+ 90 - 26
udpfusion/src/weightFusion.go

@@ -13,10 +13,6 @@ func (weight *weightDataMap) dealWithAddFusionStruct ()(map[string]interface{},m
 
 	//指定模板数据dict-单条数据
 	dict :=weight.data[weight.templateid].data
-
-	//采用新增id
-	delete(dict,"_id")
-
 	//最早发布时间 (小)
 	dict["early_publishtime"] = qu.IntAll(dict["publishtime"])
 	//最近发布时间  (大)
@@ -34,64 +30,131 @@ func (weight *weightDataMap) dealWithAddFusionStruct ()(map[string]interface{},m
 	//融合保存相关联ids
 	dict["fusion_saveids"] = weight.saveids
 
+	//采用新增id
+	delete(dict,"_id")
+
+
+	recordDict := make(map[string]interface{},0)
+	recordDict["1"] = map[string]interface{}{
+		"data":map[string]interface{}{},
+		"snapshot":map[string]interface{}{},
+	}
+	recordDict["number"] = qu.Int64All(1)
 
 
-	return dict,dict
+	return dict,recordDict
 }
 
-//处理多条融合数据-返回融合新数据,融合细节数据
-func (weight *weightDataMap) dealWithMultipleFusionStruct ()(map[string]interface{},map[string]interface{}){
+//处理多条融合数据 - 多组新增类
+func (weight *weightDataMap) dealWithMultipleAddFusionStruct ()(map[string]interface{},map[string]interface{}){
 
 	//指定模板数据dict
 	dict :=weight.data[weight.templateid].data
-
 	//最早|近发布时间
 	dict["early_publishtime"],dict["lately_publishtime"] = weight.dealWithTimeData("publishtime")
 	//最早|近入库时间
 	dict["early_comeintime"],dict["lately_comeintime"] = weight.dealWithTimeData("comeintime")
+	//所有相关联ids
+	dict["fusion_allids"] = weight.allids
+	//融合保存相关联ids
+	dict["fusion_saveids"] = weight.saveids
+
+	//日志记录
+	recordDict := make(map[string]interface{},0)
+	//结构体字段逻辑处理
+	structData := weight.dealWithStructData(&recordDict)
+	for k,v:=range structData {
+		dict[k] = v
+	}
+	//非空新增字段
+	otherFieldData := weight.dealWithOtherFieldData(&recordDict)
+	for k,v:=range otherFieldData {
+		dict[k] = v
+	}
+
+	//if len(recordDict)>0 {
+	//	log.Println("有更新数据:",len(recordDict))
+	//}
+
 	//当前更新时间
-	dict["current_updatetime"] = qu.IntAll(time.Now().Unix())
+	dict["fusion_time"] = qu.IntAll(time.Now().Unix())
+	//融合生成时间
+	dict["fusion_updatetime"] = qu.IntAll(time.Now().Unix())
+
+
+	//采用新增id
+	delete(dict,"_id")
+
+	newRecordDict := make(map[string]interface{},0)
+	newRecordDict["1"] = map[string]interface{}{
+		"data":recordDict,
+		"snapshot":map[string]interface{}{},
+	}
+	recordDict["number"] = qu.Int64All(1)
+
+	//返回,更新数据,日志记录数据
+	return dict,newRecordDict
+}
 
-	//融合生成时间-取融合表融合数据
-	dict["current_updatetime"] = ""
+//处理多条融合数据-返回融合新数据,融合更新细节数据
+func (weight *weightDataMap) dealWithMultipleUpdateFusionStruct (tmpData map[string]interface{})(map[string]interface{},map[string]interface{}){
+
+	//指定模板数据dict
+	dict :=weight.data[weight.templateid].data
+	//最早|近发布时间
+	dict["early_publishtime"],dict["lately_publishtime"] = weight.dealWithTimeData("publishtime")
+	//最早|近入库时间
+	dict["early_comeintime"],dict["lately_comeintime"] = weight.dealWithTimeData("comeintime")
 	//所有相关联ids
 	dict["fusion_allids"] = weight.allids
 	//融合保存相关联ids
 	dict["fusion_saveids"] = weight.saveids
 
 
+
 	//日志记录-还有快照页面 等等
 	recordDict := make(map[string]interface{},0)
-
-
 	//结构体字段逻辑处理
 	structData := weight.dealWithStructData(&recordDict)
 	for k,v:=range structData {
-		log.Println("key:",k,"value",v)
-		dict["k"] = v
+		dict[k] = v
 	}
-
 	//非空新增字段
 	otherFieldData := weight.dealWithOtherFieldData(&recordDict)
 	for k,v:=range otherFieldData {
-		//log.Println("key:",k,"value",v)
 		dict[k] = v
 	}
 
-	dict["repeat"] = 0
-
-
-	//log.Println("待更新数据:",dict)
-	//log.Println("待更新日志:",recordDict)
+	//if len(recordDict)>0 {
+	//	log.Println("有更新数据:",len(recordDict))
+	//}
 
 
+	//融合生成时间-取融合表第一次融合时间
+	dict["fusion_time"] = tmpData["fusion_time"]
+	//当前更新时间
+	dict["fusion_updatetime"] = qu.IntAll(time.Now().Unix())
+	//删除_id
+	delete(dict,"_id")
 
+	newRecordDict := make(map[string]interface{},0)
+	newRecordDict = mgo.FindById(record_coll_name,BsonTOStringId(tmpData["_id"]))
+	number:=qu.Int64All(newRecordDict["number"])
+	number++
+	key:=fmt.Sprintf("%d",number)
+	newRecordDict[key] = map[string]interface{}{
+		"data":recordDict,
+		"snapshot":tmpData,
+	}
+	newRecordDict["number"] = number
 
 	//返回,更新数据,日志记录数据
-	return dict,recordDict
+	return dict,newRecordDict
 }
 
 
+
+
 //处理其他字段数据
 func (weight *weightDataMap)dealWithOtherFieldData(recordDict *map[string]interface{}) map[string]interface{} {
 
@@ -143,7 +206,7 @@ func (weight *weightDataMap)dealWithOtherFieldData(recordDict *map[string]interf
 	}
 
 
-	log.Println("待替换key:",arr,"修改后:",modifyData)
+	//log.Println("待替换key:",arr,"修改后:",modifyData)
 
 	//第二步-集合最大化
 	isRank := 2
@@ -179,7 +242,7 @@ func (weight *weightDataMap)dealWithOtherFieldData(recordDict *map[string]interf
 		}
 	}
 
-	log.Println("isRank:",isRank,len(modifyData))
+	//log.Println("isRank:",isRank,len(modifyData))
 
 	return modifyData
 }
@@ -226,7 +289,7 @@ func (weight *weightDataMap)dealWithStructData(recordDict *map[string]interface{
 			keyIndex = key
 		}
 	}
-	log.Println("当前keyIndex",keyIndex)
+	//log.Println("当前keyIndex",keyIndex)
 	for _,value_id :=range weight.saveids {
 		if templateid == value_id {
 			continue
@@ -338,3 +401,4 @@ func (weight *weightDataMap)dealWithStructData(recordDict *map[string]interface{
 	return modifyData
 }
 
+

+ 2 - 4
udpfusion/src/weightValue.go

@@ -1,7 +1,6 @@
 package main
 
 import (
-	"log"
 	"sync"
 )
 
@@ -75,7 +74,7 @@ func analyzeTheSoureData(tmp map[string]interface{}) *weightInfo {
 
 
 	//测试 指定模板-数据-最高权重
-	if BsonTOStringId(tmp["_id"])=="5638baccaf53745d9a000998" {
+	if BsonTOStringId(tmp["_id"])=="602686f5f021652bdea41ea1" {
 		maxLevel = true
 	}
 
@@ -131,9 +130,8 @@ func (weight *weightDataMap) analyzeBuildStandardData() {
 		}
 	}
 	rank_s :=1
-
 	if len(isMaxIndexArr)>=1 {
-		log.Println("进行最大权重...")
+		//log.Println("进行最大权重...")
 		rankIndexArr := dealWithGroupScores(isMaxIndexArr,isMaxIndexValueArr,arrSiteLevel)
 		for _,v:=range rankIndexArr {
 			arrRanking[v] = rank_s