|
@@ -2,13 +2,14 @@ package main
|
|
|
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
+ "go.mongodb.org/mongo-driver/bson/primitive"
|
|
|
"log"
|
|
|
mu "mfw/util"
|
|
|
"net"
|
|
|
"os"
|
|
|
"qfw/common/src/qfw/util"
|
|
|
qu "qfw/util"
|
|
|
- "strconv"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
@@ -24,11 +25,12 @@ var (
|
|
|
NoNeedFusionKey map[string]interface{} //不需要融合的key
|
|
|
UpdateFusion *updateFusionInfo
|
|
|
UpdateRecord *updateRecordInfo //更新池
|
|
|
+ siteJsonData map[string]string //站点池
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
-func initMgo() {
|
|
|
+func initMgoAndSite() {
|
|
|
mconf := sysconfig["mongodb"].(map[string]interface{})
|
|
|
log.Println(mconf)
|
|
|
mgo = &MongodbSim{
|
|
@@ -45,13 +47,25 @@ func initMgo() {
|
|
|
NoNeedFusionKey = sysconfig["notFusionKey"].(map[string]interface{})
|
|
|
|
|
|
|
|
|
+ site := mconf["site"].(map[string]interface{})
|
|
|
+ siteJsonData = make(map[string]string, 0)
|
|
|
+ start := int(time.Now().Unix())
|
|
|
+ sess_site := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess_site)
|
|
|
+ res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
|
|
|
+ for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
|
|
|
+ siteJsonData[util.ObjToString(site_dict["site"])] = util.ObjToString(site_dict["sitetype"])
|
|
|
+ }
|
|
|
+ log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(siteJsonData))
|
|
|
}
|
|
|
|
|
|
|
|
|
+
|
|
|
+
|
|
|
func init() {
|
|
|
//加载配置文件
|
|
|
qu.ReadConfig(&sysconfig)
|
|
|
- initMgo()
|
|
|
+ initMgoAndSite()
|
|
|
|
|
|
//更新池
|
|
|
UpdateFusion = newUpdateFusionPool()
|
|
@@ -62,6 +76,8 @@ func init() {
|
|
|
|
|
|
|
|
|
|
|
|
+
|
|
|
+
|
|
|
log.Println("采用udp模式")
|
|
|
}
|
|
|
|
|
@@ -78,8 +94,9 @@ func mainT() {
|
|
|
//快速测试使用
|
|
|
func main() {
|
|
|
|
|
|
- sid := "1f0000000000000000000000"
|
|
|
- eid := "9f0000000000000000000000"
|
|
|
+
|
|
|
+ sid := "100000000000000000000000"
|
|
|
+ eid := "900000000000000000000000"
|
|
|
//log.Println(sid, "---", eid)
|
|
|
mapinfo := map[string]interface{}{}
|
|
|
if sid == "" || eid == "" {
|
|
@@ -100,12 +117,7 @@ func main() {
|
|
|
|
|
|
//融合具体方法
|
|
|
func startTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
-
|
|
|
- //遍历数据
|
|
|
log.Println("开始融合流程")
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
defer qu.Catch()
|
|
|
//区间id
|
|
|
q := map[string]interface{}{
|
|
@@ -119,8 +131,14 @@ func startTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
defer mgo.DestoryMongoConn(sess)
|
|
|
it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
|
//编译不同的融合组,如何划分组
|
|
|
- fusionDataGroupArr := make([][]string,0) //待融合组
|
|
|
- addOrUpdateArr := make([]bool,0) //新增-bool-记录
|
|
|
+ /***********************/
|
|
|
+ /***********************/
|
|
|
+ /***y
|
|
|
+ ********************/
|
|
|
+ /***********************/
|
|
|
+ fusionDataGroupArr := make([][]string,0) //待融合组
|
|
|
+ addOrUpdateArr := make([]bool,0) //新增-bool-记录-组新增,组更新
|
|
|
+ infoFusionArr := make([]map[string]interface{},0) //记录取融合表的数据
|
|
|
|
|
|
repeatArr,sourceArr,index := make([]string,0),make([]string,0),0 //重复数据组
|
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
@@ -136,14 +154,14 @@ func startTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
}else {
|
|
|
fusionDataGroupArr = append(fusionDataGroupArr,[]string{tmpId})
|
|
|
addOrUpdateArr = append(addOrUpdateArr,false)
|
|
|
+ infoFusionArr = append(infoFusionArr, map[string]interface{}{})
|
|
|
}
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
|
|
|
log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
|
|
|
- log.Println("状态记录:",len(addOrUpdateArr))
|
|
|
+
|
|
|
//根据重复组,重新划分新的组别
|
|
|
- num1,num2:=0,0
|
|
|
for i:=0;i<len(repeatArr);i++ {
|
|
|
sourceid := sourceArr[i]
|
|
|
isAddExist,index := false,0
|
|
@@ -163,92 +181,172 @@ func startTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
arr = fusionDataGroupArr[index]
|
|
|
arr = append(arr,repeatArr[i])//组拼接当前id
|
|
|
fusionDataGroupArr[index] = arr
|
|
|
- num1++
|
|
|
}else {//当前段落未找到-需要查询融合表,,遍历融合表
|
|
|
- arr := make([]string,0)
|
|
|
- arr = dealWithFindFusionDataArr(sourceid)
|
|
|
+ arr,fusionTmpData := make([]string,0),make(map[string]interface{},0)
|
|
|
+ arr,fusionTmpData = dealWithFindFusionDataArr(sourceid)
|
|
|
+
|
|
|
arr = append(arr,repeatArr[i])//组拼接当前id
|
|
|
- if len(arr)<1 {
|
|
|
- log.Println("数据异常,融合表找不到数据",repeatArr[i])
|
|
|
- }else { //新增
|
|
|
- log.Println("数据融合新增")
|
|
|
+ if len(arr)<1 { //异常错误,新增
|
|
|
+ log.Println("... ... 数据异常异常,融合表,当前组均找不到数据",repeatArr[i])
|
|
|
+ arr_error := make([]string,0)
|
|
|
+ arr_error = append(arr_error,repeatArr[i])//组拼接当前id
|
|
|
+ fusionDataGroupArr = append(fusionDataGroupArr,arr_error)
|
|
|
+ addOrUpdateArr = append(addOrUpdateArr,false)
|
|
|
+ infoFusionArr = append(infoFusionArr, map[string]interface{}{})
|
|
|
+ }else { //正常更新
|
|
|
fusionDataGroupArr = append(fusionDataGroupArr,arr)
|
|
|
addOrUpdateArr = append(addOrUpdateArr,true)
|
|
|
+ infoFusionArr = append(infoFusionArr,fusionTmpData)
|
|
|
}
|
|
|
- num2++
|
|
|
|
|
|
}
|
|
|
//不断改变中
|
|
|
- log.Println("当前分组数量:",len(fusionDataGroupArr))
|
|
|
+ //log.Println("当前分组数量:",len(fusionDataGroupArr))
|
|
|
}
|
|
|
+ log.Println("最终待融合分组数量:",len(fusionDataGroupArr))
|
|
|
+ log.Println("********************分割线********************")
|
|
|
+ log.Println("********************分割线********************")
|
|
|
+ log.Println("********************分割线********************")
|
|
|
+ log.Println("开始处理新增分组... ...")
|
|
|
|
|
|
- log.Println("分组完毕:","重复新增数量:",num1,"重复更新数量:",num2,len(repeatArr))
|
|
|
- log.Println("最终带融合分组:",len(fusionDataNewGroupArr))
|
|
|
- //分组细节需要修改 - 带测试
|
|
|
- return
|
|
|
|
|
|
|
|
|
- log.Println("开始处理新增分组... ...")
|
|
|
start := int(time.Now().Unix())
|
|
|
- //进行分组融合
|
|
|
- for i:=0;i<len(fusionDataNewGroupArr);i++ {
|
|
|
- fusionArr := fusionDataNewGroupArr[i]
|
|
|
- //构建数据
|
|
|
- log.Println("构建第一组数据...",fusionArr)
|
|
|
- weight :=NewWeightData(fusionArr)
|
|
|
- //整理数据-筛选排名,模板
|
|
|
- weight.analyzeBuildStandardData()
|
|
|
- if len(fusionArr)<=1 {
|
|
|
- //更新数据(融合表) 日志数据(日志记录表)
|
|
|
- //updateData,_ := weight.dealWithAddFusionStruct()
|
|
|
- //log.Println("新增:更新数据",len(updateData))
|
|
|
- //mgo.Save(fusion_coll_name,updateData) //新增
|
|
|
- }else {
|
|
|
- //updateData,_ := weight.dealWithMultipleFusionStruct()
|
|
|
- //log.Println("多组新增:更新数据",len(updateData))
|
|
|
- //mgo.Save(fusion_coll_name,updateData)
|
|
|
- }
|
|
|
- }
|
|
|
- log.Println("新增融合over :",len(fusionDataNewGroupArr),"用时:",int(time.Now().Unix())-start)
|
|
|
|
|
|
+ //多线程 - 处理数据
|
|
|
+ pool := make(chan bool, 3)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+
|
|
|
+ for i:=0;i<len(fusionDataGroupArr);i++ {
|
|
|
+ fusionArr := fusionDataGroupArr[i]
|
|
|
+ pool <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(fusionArr []string,i int) {
|
|
|
+ defer func() {
|
|
|
+ <-pool
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ //构建数据
|
|
|
+ log.Println("构建第",i+1,"组数据...","数量:",len(fusionArr),fusionArr)
|
|
|
+ weight :=NewWeightData(fusionArr)
|
|
|
+ ////整理数据-筛选排名,模板
|
|
|
+ weight.analyzeBuildStandardData()
|
|
|
+
|
|
|
+ if len(fusionArr)<=1 {
|
|
|
+ //log.Println("单组生成... ...")
|
|
|
+ saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
|
|
|
+ saveid:=mgo.Save(fusion_coll_name,saveFusionData)
|
|
|
+ saveRecordData["_id"] = saveid
|
|
|
+ mgo.Save(record_coll_name,saveRecordData)
|
|
|
+ }else {
|
|
|
+ if addOrUpdateArr[i] {
|
|
|
+ //log.Println("多组更新... ...")
|
|
|
+ tmpdata:=infoFusionArr[i]
|
|
|
+ updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
|
|
|
+
|
|
|
+ UpdateFusion.updatePool <- []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmpdata["_id"],
|
|
|
+ },
|
|
|
+ updateFusionData,
|
|
|
+ }
|
|
|
+ UpdateRecord.updatePool <- []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmpdata["_id"],
|
|
|
+ },
|
|
|
+ updateRecordData,
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ //log.Println("多组生成... ...")
|
|
|
+ saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
|
|
|
+ saveid:=mgo.Save(fusion_coll_name,saveFusionData)
|
|
|
+ saveRecordData["_id"] = saveid
|
|
|
+ mgo.Save(record_coll_name,saveRecordData)
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- //多组-融合表更新
|
|
|
- //UpdateFusion.updatePool <- []map[string]interface{}{//原始数据打标签
|
|
|
- // map[string]interface{}{},
|
|
|
- // updateData,
|
|
|
- //}
|
|
|
|
|
|
+ }(fusionArr,i)
|
|
|
|
|
|
- time.Sleep(30 * time.Second)
|
|
|
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+
|
|
|
+ log.Println("fusion is over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
|
|
|
+ log.Println("睡眠30秒,然后在发广播")
|
|
|
+ time.Sleep(30 * time.Second)
|
|
|
//任务完成,开始发送广播通知下面节点
|
|
|
taskSendFusionUdp(mapInfo)
|
|
|
|
|
|
}
|
|
|
|
|
|
//查询融合表数据-找到对应组id
|
|
|
-func dealWithFindFusionDataArr(sourceid string) []string {
|
|
|
- arr := make([]string,0)
|
|
|
+func dealWithFindFusionDataArr(sourceid string) ([]string,map[string]interface{}) {
|
|
|
+ newArr ,arr := make([]string,0),make(primitive.A,0)
|
|
|
+ tmpData:=make(map[string]interface{},0)
|
|
|
q := map[string]interface{}{}
|
|
|
sess := mgo.GetMgoConn()
|
|
|
defer mgo.DestoryMongoConn(sess)
|
|
|
- it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
|
+ it := sess.DB(mgo.DbName).C(fusion_coll_name).Find(&q).Iter()
|
|
|
+
|
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); {
|
|
|
- fusion_allids := tmp["fusion_allids"].([]string)
|
|
|
- for _,v:=range fusion_allids {
|
|
|
- if v==sourceid {
|
|
|
- //找到目标组-
|
|
|
- arr = fusion_allids
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- break
|
|
|
+ //log.Println(reflect.TypeOf(tmp["fusion_allids"]))
|
|
|
+ if fusion_allids,b := tmp["fusion_allids"].(primitive.A);b {
|
|
|
+ for _,v:=range fusion_allids {
|
|
|
+ if v==sourceid {
|
|
|
+ //找到目标组-
|
|
|
+ arr = fusion_allids
|
|
|
+ tmpData = tmp
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
- return arr
|
|
|
+
|
|
|
+ for _,v:=range arr{
|
|
|
+ newArr = append(newArr,qu.ObjToString(v))
|
|
|
+ }
|
|
|
+
|
|
|
+ return newArr,tmpData
|
|
|
}
|
|
|
|
|
|
+//查询记录1表数据-找到对应的id , 更新用到
|
|
|
+func dealWithFindRecordData(sourceid string) string {
|
|
|
+ newArr ,arr := make([]string,0),make(primitive.A,0)
|
|
|
+ //tmpData:=make(map[string]interface{},0)
|
|
|
+ q := map[string]interface{}{}
|
|
|
+ sess := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+ it := sess.DB(mgo.DbName).C(fusion_coll_name).Find(&q).Iter()
|
|
|
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); {
|
|
|
+ //log.Println(reflect.TypeOf(tmp["fusion_allids"]))
|
|
|
+ if fusion_allids,b := tmp["fusion_allids"].(primitive.A);b {
|
|
|
+ for _,v:=range fusion_allids {
|
|
|
+ if v==sourceid {
|
|
|
+ //找到目标组-
|
|
|
+ arr = fusion_allids
|
|
|
+ //tmpData = tmp
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+
|
|
|
+ for _,v:=range arr{
|
|
|
+ newArr = append(newArr,qu.ObjToString(v))
|
|
|
+ }
|
|
|
+
|
|
|
+ return ""
|
|
|
+}
|
|
|
|
|
|
//udp 监听
|
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
@@ -282,7 +380,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+//结束发送udp
|
|
|
func taskSendFusionUdp(mapinfo map[string]interface{}) {
|
|
|
|
|
|
//log.Println("信息融合结束-发送udp")
|
|
@@ -314,14 +412,5 @@ func taskSendFusionUdp(mapinfo map[string]interface{}) {
|
|
|
|
|
|
|
|
|
|
|
|
-//判断是否在当前id段落
|
|
|
-func judgeIsCurIds (gtid string,lteid string,curid string) bool {
|
|
|
|
|
|
- gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
|
|
|
- lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
|
|
|
- cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
|
|
|
- if cur_time>gt_time&&cur_time<=lte_time {
|
|
|
- return true
|
|
|
- }
|
|
|
- return false
|
|
|
-}
|
|
|
+
|