ソースを参照

融合 正常增量融合 - 业务逻辑备份

apple 4 年 前
コミット
f0cb18096e

+ 166 - 184
udpfusion/src/fusionAddData.go

@@ -1,187 +1,169 @@
 package main
 
+import (
+	"log"
+	qu "qfw/util"
+	"strings"
+	"sync"
+	"time"
+)
+
 //增量-融合一小段
-//func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
-//	log.Println("开始增量融合流程")
-//	defer qu.Catch()
-//	//区间id
-//	q := map[string]interface{}{
-//		"_id": map[string]interface{}{
-//			"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
-//			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
-//		},
-//	}
-//	log.Println("查询条件:",q)
-//	sess := mgo.GetMgoConn()
-//	defer mgo.DestoryMongoConn(sess)
-//	it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
-//	//编译不同的融合组,划分组,内存方式即可,小段落数据
-//
-//	//待融合组
-//	fusionDataGroupArr := make([][]string,0)
-//	//需要更新组
-//	updateFusionMap,curFusionKeyMap:=make(map[string]interface{},0),make(map[string]interface{},0)
-//	//重复数据组
-//	norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0
-//	start := int(time.Now().Unix())
-//	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
-//		if index%10000 == 0 {
-//			log.Println("current index",index,tmp["_id"])
-//		}
-//		tmpId:=BsonTOStringId(tmp["_id"])
-//		repeat:=qu.IntAll(tmp["repeat"])
-//		sourceid:=qu.ObjToString(tmp["repeat_id"])
-//		if repeat==1 {
-//			repeatArr = append(repeatArr,tmpId)
-//			sourceArr = append(sourceArr,sourceid)
-//		}else {
-//			norepeatArr = append(repeatArr,tmpId)
-//		}
-//		tmp = make(map[string]interface{})
-//	}
-//
-//	log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
-//	log.Println("遍历数据用时:",int(time.Now().Unix())-start,"秒")
-//
-//	//根据重复组,重新划分新的组别
-//	start = int(time.Now().Unix())
-//	isErrNum:=0
-//	for i:=0;i<len(repeatArr);i++ {
-//		sourceid := sourceArr[i]
-//		isAddExist,index := false,0
-//	R:	for k,v:=range fusionDataGroupArr { //根据原sourceid 直接遍历当前组
-//			for _,v1:=range v{
-//				if v1==sourceid {
-//					index = k
-//					isAddExist = true
-//					break R
-//				}
-//			}
-//		}
-//		if i%1000 == 0 {
-//			log.Println("分组中...","current index",i,repeatArr[i])
-//		}
-//
-//		if isAddExist { //数组截取替换-找到指定
-//			arr := make([]string,0)
-//			arr = fusionDataGroupArr[index]
-//			arr = append(arr,repeatArr[i])//组拼接当前id
-//			fusionDataGroupArr[index] = arr
-//
-//		}else {//当前段落未找到-需要查询全量分组表
-//
-//			arr,fusionTmpData := make([]string,0),make(map[string]interface{},0)
-//			arr,fusionTmpData = dealWithFindFusionDataArr(sourceid)
-//			arr = append(arr,repeatArr[i])//组拼接当前id
-//
-//			if len(arr)==1 { //异常错误,新增
-//				isErrNum++
-//				log.Println("... ... 数据异常异常,融合表,当前组均找不到数据",repeatArr[i])
-//				arr_error := make([]string,0)
-//				arr_error = append(arr_error,repeatArr[i])//组拼接当前id
-//				fusionDataGroupArr = append(fusionDataGroupArr,arr_error)
-//				addOrUpdateArr = append(addOrUpdateArr,false)
-//				infoFusionArr = append(infoFusionArr, map[string]interface{}{})
-//			}else { //正常更新
-//				log.Println("... ... 正常多组新增",i)
-//				fusionDataGroupArr = append(fusionDataGroupArr,arr)
-//				addOrUpdateArr = append(addOrUpdateArr,true)
-//				infoFusionArr = append(infoFusionArr,fusionTmpData)
-//			}
-//
-//		}
-//		//不断改变中
-//		if i%1000 == 0 {
-//			log.Println("当前分组数量:",len(fusionDataGroupArr))
-//		}
-//	}
-//
-//
-//
-//
-//	log.Println("最终待融合分组数量:",len(fusionDataGroupArr))
-//	log.Println("分组完毕数据用时:",int(time.Now().Unix())-start,"秒")
-//	log.Println("********************分割线********************")
-//	log.Println("********************分割线********************")
-//	log.Println("********************分割线********************")
-//
-//
-//	log.Println("开始处理分组融合... ... ... ...")
-//	log.Println("开始处理分组融合... ... ... ...")
-//	log.Println("开始处理分组融合... ... ... ...")
-//
-//	start = int(time.Now().Unix())
-//	//多线程 - 处理数据
-//	pool := make(chan bool, 3)
-//	wg := &sync.WaitGroup{}
-//
-//	for i:=0;i<len(fusionDataGroupArr);i++ {
-//		fusionArr := fusionDataGroupArr[i]
-//		pool <- true
-//		wg.Add(1)
-//		go func(fusionArr []string,i int) {
-//			defer func() {
-//				<-pool
-//				wg.Done()
-//			}()
-//			//构建数据
-//			if (i+1)%500 == 0 {
-//				log.Println("构建第",i+1,"组数据...","数量:",len(fusionArr),fusionArr)
-//			}
-//			weight :=NewWeightData(fusionArr,"")
-//			////整理数据-筛选排名,模板
-//			weight.analyzeBuildStandardData()
-//
-//			if len(fusionArr)<=1 {
-//				saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
-//				saveid:=mgo.Save(fusion_coll_name,saveFusionData)
-//				saveRecordData["_id"] = saveid
-//				mgo.Save(record_coll_name,saveRecordData)
-//			}else {
-//				//if addOrUpdateArr[i] {
-//				//	//log.Println("多组更新... ...")
-//				//	tmpdata:=infoFusionArr[i]
-//				//	updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
-//				//
-//				//	UpdateFusion.updatePool <- []map[string]interface{}{
-//				//		map[string]interface{}{
-//				//			"_id": tmpdata["_id"],
-//				//		},
-//				//		updateFusionData,
-//				//	}
-//				//	UpdateRecord.updatePool <- []map[string]interface{}{
-//				//		map[string]interface{}{
-//				//			"_id": tmpdata["_id"],
-//				//		},
-//				//		updateRecordData,
-//				//	}
-//				//}else {
-//				//	//log.Println("多组生成... ...")
-//				//	saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
-//				//	saveid:=mgo.Save(fusion_coll_name,saveFusionData)
-//				//	saveRecordData["_id"] = saveid
-//				//	mgo.Save(record_coll_name,saveRecordData)
-//				//}
-//			}
-//
-//
-//		}(fusionArr,i)
-//
-//
-//
-//	}
-//
-//	wg.Wait()
-//
-//	log.Println("fusion is over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
-//	log.Println("睡眠30秒,然后在发广播")
-//	time.Sleep(30 * time.Second)
-//	//任务完成,开始发送广播通知下面节点
-//	taskSendFusionUdp(mapInfo)
-//
-//}
-//
-//func dealWithFindFusionGroupDataArr (sourceid string) string {
-//
-//	return ""
-//}
+func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
+	log.Println("开始增量融合流程")
+	
+	defer qu.Catch()
+	//区间id
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
+			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
+		},
+	}
+	log.Println("查询条件:",q)
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
+
+	start := int(time.Now().Unix())
+	tmpFusionMap := make(map[string]string,0)
+	index,isOK:=0,0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if index%10000 == 0 {
+			log.Println("current index",index,tmp["_id"])
+		}
+		
+		repeat := qu.IntAll(tmp["repeat"])
+		repeatid,sourceid := BsonTOStringId(tmp["_id"]),BsonTOStringId(tmp["_id"])
+		if repeat==1 {
+			sourceid = qu.ObjToString(tmp["repeat_id"])
+		}
+		if sourceid!="" {
+			if tmpFusionMap[sourceid]!=""  {
+				ids := tmpFusionMap[sourceid]
+				ids = ids+","+repeatid
+				tmpFusionMap[sourceid] = ids
+			}else {
+				tmpFusionMap[sourceid] = repeatid
+			}
+		}else {
+			//log.Println("异常: sourceid 为空 ")
+		}
+
+
+		tmp = make(map[string]interface{})
+	}
+	log.Println("task mongo first:",index,len(tmpFusionMap),"开始确认最后增量组数据...")
+
+	isUpdateMap:=make(map[string]string)
+
+	for sourceid,value:=range tmpFusionMap {
+		isOK++
+		if isOK % 1000 ==0 {
+			log.Println("当前数量:",isOK,sourceid)
+		}
+		data:=mgo.FindById(group_coll_name,sourceid)
+		if data!=nil && len(data)>2 { //更新组
+			allids:= qu.ObjToString(data["allids"])
+			tmpFusionMap[sourceid] = allids+","+value
+			isUpdateMap[sourceid] = qu.ObjToString(data["fusion_id"])
+		}
+	}
+
+	log.Println("分组数据总用时:",int(time.Now().Unix())-start,"秒")
+
+
+
+	stareFusionData(tmpFusionMap,isUpdateMap)
+
+	log.Println("此段落结束,发送udp","...睡眠30s")
+	time.Sleep(30 * time.Second)
+
+	taskSendFusionUdp(mapInfo)
+}
+
+func stareFusionData(tmpFusionMap map[string]string,isUpdateMap map[string]string) {
+	//根据重复组,重新划分新的组别
+	log.Println("开始融合操作......")
+
+	index,start :=0, int(time.Now().Unix())
+	//多线程保存数据
+	pool_mgo := make(chan bool, mgo_pool)
+	wg_mgo := &sync.WaitGroup{}
+	for sourceid,v:=range tmpFusionMap {
+		fusionArr := strings.Split(v, ",")
+		fusionid := ""
+		if isUpdateMap[sourceid]!="" {
+			fusionid = qu.ObjToString(isUpdateMap[sourceid])
+		}
+		pool_mgo <- true
+		wg_mgo.Add(1)
+		go func(sourceid string, fusionArr []string,fusionid string) {
+			defer func() {
+				<-pool_mgo
+				wg_mgo.Done()
+			}()
+			weight := NewWeightData(fusionArr,sourceid)
+			weight.analyzeBuildStandardData()
+
+			saveFusionData, saveRecordData:= map[string]interface{}{},map[string]interface{}{}
+			if fusionid!="" {//更新
+				saveFusionData, saveRecordData:=weight.dealWithMultipleUpdateFusionStruct(fusionid)
+				//更新融合表
+				UpdateFusionPool.pool <- []map[string]interface{}{
+					map[string]interface{}{
+						"_id": StringTOBsonId(fusionid),
+					},
+					saveFusionData,
+				}
+				//更新日志表
+				UpdateRecordPool.pool <- []map[string]interface{}{
+					map[string]interface{}{
+						"_id": StringTOBsonId(fusionid),
+					},
+					map[string]interface{}{
+						"$set": saveRecordData,
+					},
+				}
+				//更新分组表
+				UpdateGroupPool.pool <- []map[string]interface{}{
+					map[string]interface{}{
+						"_id": StringTOBsonId(sourceid),
+					},
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"allids":strings.Join(fusionArr, ","),
+						},
+					},
+				}
+
+
+
+			}else {
+				if len(fusionArr) <= 1 {
+					saveFusionData, saveRecordData = weight.dealWithAddFusionStruct()
+				}else {
+					saveFusionData, saveRecordData = weight.dealWithMultipleAddFusionStruct()
+				}
+				//新增融合表
+				saveid := mgo.Save(fusion_coll_name, saveFusionData)
+				saveRecordData["_id"] = saveid
+				//批量新增日志表
+				AddRecordPool.pool <- saveRecordData
+				//批量新增分组表
+				AddGroupPool.pool <- map[string]interface{}{
+					"_id":StringTOBsonId(sourceid),
+					"allids":strings.Join(fusionArr, ","),
+					"fusion_id": BsonTOStringId(saveid),
+					"template_id":qu.ObjToString(saveFusionData["fusion_templateid"]),
+				}
+			}
+
+		}(sourceid, fusionArr,fusionid)
+	}
+	wg_mgo.Wait()
+
+	log.Println("add fusion is over:",index,"总用时:",int(time.Now().Unix())-start,"秒")
+
+
+}

+ 1 - 1
udpfusion/src/fusionFullData.go

@@ -134,7 +134,7 @@ func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
 	time.Sleep(60 * time.Second)
 
 
-	//先到处具体需要融合组数据-存mongo
+	//先导出具体需要融合组数据-存mongo
 	exportFusionMongoData()
 	time.Sleep(60 * time.Second)
 

+ 1 - 1
udpfusion/src/main.go

@@ -82,7 +82,7 @@ func initEs()  {
 }
 
 func initAddUpdatePool()  {
-	IsFull = true //全量数据
+	IsFull = false //全量数据
 	if IsFull {
 		AddGroupPool = newAddGroupPool()
 		go AddGroupPool.addGroupData()

+ 1 - 1
udpfusion/src/updateMethod.go

@@ -31,7 +31,7 @@ type addRecordInfo struct {
 	pool chan map[string]interface{}
 	saveSize   	int
 }
-//更新融合
+//更新日志
 type updateRecordInfo struct {
 	pool chan []map[string]interface{}
 	saveSize   	int

+ 1 - 1
udpfusion/src/weightFusion.go

@@ -111,7 +111,7 @@ func (weight *weightDataMap) dealWithMultipleAddFusionStruct ()(map[string]inter
 }
 
 //处理多条融合数据-返回融合新数据,融合更新细节数据
-func (weight *weightDataMap) dealWithMultipleUpdateFusionStruct (tmpData map[string]interface{})(map[string]interface{},map[string]interface{}){
+func (weight *weightDataMap) dealWithMultipleUpdateFusionStruct (fusion string)(map[string]interface{},map[string]interface{}){
 
 	//指定模板数据dict
 	dict :=weight.data[weight.templateid].data