|
@@ -8,16 +8,22 @@ import (
|
|
|
"os"
|
|
|
"qfw/common/src/qfw/util"
|
|
|
qu "qfw/util"
|
|
|
+ "strconv"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
|
|
|
var (
|
|
|
- sysconfig map[string]interface{} //配置文件
|
|
|
- mgo *MongodbSim //mongodb操作对象
|
|
|
- udpclient mu.UdpClient //udp对象
|
|
|
- nextNode []map[string]interface{} //下节点数组
|
|
|
- coll_name,fusion_coll_name,record_coll_name string
|
|
|
+ sysconfig map[string]interface{} //配置文件
|
|
|
+ mgo *MongodbSim //mongodb操作对象
|
|
|
+ udpclient mu.UdpClient //udp对象
|
|
|
+ nextNode []map[string]interface{} //下节点数组
|
|
|
+ coll_name string
|
|
|
+ fusion_coll_name string
|
|
|
+ record_coll_name string //表名
|
|
|
+ NoNeedFusionKey map[string]interface{} //不需要融合的key
|
|
|
+ UpdateFusion *updateFusionInfo
|
|
|
+ UpdateRecord *updateRecordInfo //更新池
|
|
|
)
|
|
|
|
|
|
|
|
@@ -36,6 +42,9 @@ func initMgo() {
|
|
|
coll_name = mconf["collName"].(string)
|
|
|
fusion_coll_name = sysconfig["fusion_coll_name"].(string)
|
|
|
record_coll_name = sysconfig["record_coll_name"].(string)
|
|
|
+ NoNeedFusionKey = sysconfig["notFusionKey"].(map[string]interface{})
|
|
|
+
|
|
|
+
|
|
|
}
|
|
|
|
|
|
|
|
@@ -43,6 +52,16 @@ func init() {
|
|
|
//加载配置文件
|
|
|
qu.ReadConfig(&sysconfig)
|
|
|
initMgo()
|
|
|
+
|
|
|
+ //更新池
|
|
|
+ UpdateFusion = newUpdateFusionPool()
|
|
|
+ go UpdateFusion.updateFusionData()
|
|
|
+
|
|
|
+ UpdateRecord = newUpdateRecordPool()
|
|
|
+ go UpdateRecord.updateRecordData()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
log.Println("采用udp模式")
|
|
|
}
|
|
|
|
|
@@ -75,36 +94,7 @@ func main() {
|
|
|
}
|
|
|
|
|
|
|
|
|
-func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
- switch act {
|
|
|
- case mu.OP_TYPE_DATA: //上个节点的数据
|
|
|
- //从表中开始处理
|
|
|
- var mapInfo map[string]interface{}
|
|
|
- err := json.Unmarshal(data, &mapInfo)
|
|
|
- log.Println("err:", err, "mapInfo:", mapInfo)
|
|
|
- if err != nil {
|
|
|
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
|
|
|
- } else if mapInfo != nil {
|
|
|
- taskType := qu.ObjToString(mapInfo["stype"])
|
|
|
- if taskType == "fusion" {
|
|
|
- go startTask(data, mapInfo)
|
|
|
- } else {
|
|
|
- log.Println("未知类型:融合异常... ...")
|
|
|
- }
|
|
|
- key, _ := mapInfo["key"].(string)
|
|
|
- if key == "" {
|
|
|
- key = "udpok"
|
|
|
- }
|
|
|
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
|
|
|
- }
|
|
|
- case mu.OP_NOOP: //下个节点回应
|
|
|
- ok := string(data)
|
|
|
- if ok != "" {
|
|
|
- log.Println("ok:", ok)
|
|
|
- udptaskmap.Delete(ok)
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
+
|
|
|
|
|
|
|
|
|
|
|
@@ -115,21 +105,6 @@ func startTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
log.Println("开始融合流程")
|
|
|
|
|
|
|
|
|
- //分组数据-分组融合
|
|
|
-
|
|
|
- //构建数据
|
|
|
- weight :=NewWeightData([]string{},"")
|
|
|
- //整理数据-筛选排名,模板
|
|
|
- weight.analyzeBuildStandardData()
|
|
|
- log.Println("筛选出模拟数据:",weight.templateid)
|
|
|
- weight.dealWithMultipleFusionStruct()
|
|
|
- //进行融合
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- return
|
|
|
-
|
|
|
|
|
|
defer qu.Catch()
|
|
|
//区间id
|
|
@@ -143,40 +118,172 @@ func startTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
sess := mgo.GetMgoConn()
|
|
|
defer mgo.DestoryMongoConn(sess)
|
|
|
it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
|
- updateExtract := [][]map[string]interface{}{}
|
|
|
- index:=0
|
|
|
+ //编译不同的融合组,如何划分组
|
|
|
+ fusionDataGroupArr := make([][]string,0) //待融合组
|
|
|
+ addOrUpdateArr := make([]bool,0) //新增-bool-记录
|
|
|
+
|
|
|
+ repeatArr,sourceArr,index := make([]string,0),make([]string,0),0 //重复数据组
|
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
|
- if index%10000 == 0 {
|
|
|
- log.Println("当前数量:", index, tmp["_id"])
|
|
|
+ if index%1000 == 0 {
|
|
|
+ log.Println("current index",index,tmp["_id"])
|
|
|
}
|
|
|
+ tmpId:=BsonTOStringId(tmp["_id"])
|
|
|
+ repeat:=qu.IntAll(tmp["repeat"])
|
|
|
+ sourceid:=qu.ObjToString(tmp["repeat_id"])
|
|
|
+ if repeat==1 {
|
|
|
+ repeatArr = append(repeatArr,tmpId)
|
|
|
+ sourceArr = append(sourceArr,sourceid)
|
|
|
+ }else {
|
|
|
+ fusionDataGroupArr = append(fusionDataGroupArr,[]string{tmpId})
|
|
|
+ addOrUpdateArr = append(addOrUpdateArr,false)
|
|
|
+ }
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
|
|
|
- //log.Println(we)
|
|
|
-
|
|
|
+ log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
|
|
|
+ log.Println("状态记录:",len(addOrUpdateArr))
|
|
|
+ //根据重复组,重新划分新的组别
|
|
|
+ num1,num2:=0,0
|
|
|
+ for i:=0;i<len(repeatArr);i++ {
|
|
|
+ sourceid := sourceArr[i]
|
|
|
+ isAddExist,index := false,0
|
|
|
+ //根据原sourceid 直接遍历组
|
|
|
+ R: for k,v:=range fusionDataGroupArr{
|
|
|
+ for _,v1:=range v{
|
|
|
+ if v1==sourceid {
|
|
|
+ index = k
|
|
|
+ isAddExist = true
|
|
|
+ break R
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ if isAddExist { //数组截取替换-找到指定
|
|
|
+ arr := make([]string,0)
|
|
|
+ arr = fusionDataGroupArr[index]
|
|
|
+ arr = append(arr,repeatArr[i])//组拼接当前id
|
|
|
+ fusionDataGroupArr[index] = arr
|
|
|
+ num1++
|
|
|
+ }else {//当前段落未找到-需要查询融合表,,遍历融合表
|
|
|
+ arr := make([]string,0)
|
|
|
+ arr = dealWithFindFusionDataArr(sourceid)
|
|
|
+ arr = append(arr,repeatArr[i])//组拼接当前id
|
|
|
+ if len(arr)<1 {
|
|
|
+ log.Println("数据异常,融合表找不到数据",repeatArr[i])
|
|
|
+ }else { //新增
|
|
|
+ log.Println("数据融合新增")
|
|
|
+ fusionDataGroupArr = append(fusionDataGroupArr,arr)
|
|
|
+ addOrUpdateArr = append(addOrUpdateArr,true)
|
|
|
+ }
|
|
|
+ num2++
|
|
|
|
|
|
- tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ //不断改变中
|
|
|
+ log.Println("当前分组数量:",len(fusionDataGroupArr))
|
|
|
}
|
|
|
|
|
|
+ log.Println("分组完毕:","重复新增数量:",num1,"重复更新数量:",num2,len(repeatArr))
|
|
|
+ log.Println("最终带融合分组:",len(fusionDataNewGroupArr))
|
|
|
+ //分组细节需要修改 - 带测试
|
|
|
+ return
|
|
|
|
|
|
- if len(updateExtract) >0 {
|
|
|
- mgo.UpSertBulk(coll_name, updateExtract...)
|
|
|
|
|
|
+ log.Println("开始处理新增分组... ...")
|
|
|
+ start := int(time.Now().Unix())
|
|
|
+ //进行分组融合
|
|
|
+ for i:=0;i<len(fusionDataNewGroupArr);i++ {
|
|
|
+ fusionArr := fusionDataNewGroupArr[i]
|
|
|
+ //构建数据
|
|
|
+ log.Println("构建第一组数据...",fusionArr)
|
|
|
+ weight :=NewWeightData(fusionArr)
|
|
|
+ //整理数据-筛选排名,模板
|
|
|
+ weight.analyzeBuildStandardData()
|
|
|
+ if len(fusionArr)<=1 {
|
|
|
+ //更新数据(融合表) 日志数据(日志记录表)
|
|
|
+ //updateData,_ := weight.dealWithAddFusionStruct()
|
|
|
+ //log.Println("新增:更新数据",len(updateData))
|
|
|
+ //mgo.Save(fusion_coll_name,updateData) //新增
|
|
|
+ }else {
|
|
|
+ //updateData,_ := weight.dealWithMultipleFusionStruct()
|
|
|
+ //log.Println("多组新增:更新数据",len(updateData))
|
|
|
+ //mgo.Save(fusion_coll_name,updateData)
|
|
|
+ }
|
|
|
}
|
|
|
+ log.Println("新增融合over :",len(fusionDataNewGroupArr),"用时:",int(time.Now().Unix())-start)
|
|
|
|
|
|
|
|
|
- log.Println("task fusion over - 总计数量",index)
|
|
|
+ //多组-融合表更新
|
|
|
+ //UpdateFusion.updatePool <- []map[string]interface{}{//原始数据打标签
|
|
|
+ // map[string]interface{}{},
|
|
|
+ // updateData,
|
|
|
+ //}
|
|
|
+
|
|
|
|
|
|
time.Sleep(30 * time.Second)
|
|
|
|
|
|
//任务完成,开始发送广播通知下面节点
|
|
|
+ taskSendFusionUdp(mapInfo)
|
|
|
|
|
|
- sendUdp(mapInfo)
|
|
|
+}
|
|
|
+
|
|
|
+//查询融合表数据-找到对应组id
|
|
|
+func dealWithFindFusionDataArr(sourceid string) []string {
|
|
|
+ arr := make([]string,0)
|
|
|
+ q := map[string]interface{}{}
|
|
|
+ sess := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+ it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); {
|
|
|
+ fusion_allids := tmp["fusion_allids"].([]string)
|
|
|
+ for _,v:=range fusion_allids {
|
|
|
+ if v==sourceid {
|
|
|
+ //找到目标组-
|
|
|
+ arr = fusion_allids
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ return arr
|
|
|
+}
|
|
|
|
|
|
|
|
|
|
|
|
+//udp 监听
|
|
|
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
+ switch act {
|
|
|
+ case mu.OP_TYPE_DATA: //上个节点的数据
|
|
|
+ //从表中开始处理
|
|
|
+ var mapInfo map[string]interface{}
|
|
|
+ err := json.Unmarshal(data, &mapInfo)
|
|
|
+ log.Println("err:", err, "mapInfo:", mapInfo)
|
|
|
+ if err != nil {
|
|
|
+ udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
|
|
|
+ } else if mapInfo != nil {
|
|
|
+ taskType := qu.ObjToString(mapInfo["stype"])
|
|
|
+ if taskType == "fusion" {
|
|
|
+ go startTask(data, mapInfo)
|
|
|
+ } else {
|
|
|
+ log.Println("未知类型:融合异常... ...")
|
|
|
+ }
|
|
|
+ key, _ := mapInfo["key"].(string)
|
|
|
+ if key == "" {
|
|
|
+ key = "udpok"
|
|
|
+ }
|
|
|
+ udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
|
|
|
+ }
|
|
|
+ case mu.OP_NOOP: //下个节点回应
|
|
|
+ ok := string(data)
|
|
|
+ if ok != "" {
|
|
|
+ log.Println("ok:", ok)
|
|
|
+ udptaskmap.Delete(ok)
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-func sendUdp(mapinfo map[string]interface{}) {
|
|
|
+
|
|
|
+func taskSendFusionUdp(mapinfo map[string]interface{}) {
|
|
|
|
|
|
//log.Println("信息融合结束-发送udp")
|
|
|
for _, to := range nextNode {
|
|
@@ -197,4 +304,24 @@ func sendUdp(mapinfo map[string]interface{}) {
|
|
|
udptaskmap.Store(key, node)
|
|
|
udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
}
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+//判断是否在当前id段落
|
|
|
+func judgeIsCurIds (gtid string,lteid string,curid string) bool {
|
|
|
+
|
|
|
+ gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
|
|
|
+ lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
|
|
|
+ cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
|
|
|
+ if cur_time>gt_time&&cur_time<=lte_time {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ return false
|
|
|
}
|