|
@@ -2,70 +2,81 @@ package main
|
|
|
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
- "go.mongodb.org/mongo-driver/bson/primitive"
|
|
|
"log"
|
|
|
mu "mfw/util"
|
|
|
"net"
|
|
|
"os"
|
|
|
- "qfw/common/src/qfw/util"
|
|
|
qu "qfw/util"
|
|
|
"sync"
|
|
|
"time"
|
|
|
+ "qfw/util/elastic"
|
|
|
)
|
|
|
|
|
|
|
|
|
var (
|
|
|
- sysconfig map[string]interface{} //配置文件
|
|
|
- mgo *MongodbSim //mongodb操作对象
|
|
|
- udpclient mu.UdpClient //udp对象
|
|
|
- nextNode []map[string]interface{} //下节点数组
|
|
|
- coll_name string
|
|
|
- fusion_coll_name string
|
|
|
- record_coll_name string //表名
|
|
|
- NoNeedFusionKey map[string]interface{} //不需要融合的key
|
|
|
- UpdateFusion *updateFusionInfo
|
|
|
- UpdateRecord *updateRecordInfo //更新池
|
|
|
- siteJsonData map[string]string //站点池
|
|
|
+ sysconfig map[string]interface{} //配置文件
|
|
|
+ mgo *MongodbSim //mongodb操作对象
|
|
|
+ udpclient mu.UdpClient //udp对象
|
|
|
+ nextNode []map[string]interface{} //下节点数组
|
|
|
+ coll_name string
|
|
|
+ fusion_coll_name,record_coll_name string //新增表名
|
|
|
+ NoNeedFusionKey map[string]interface{} //不需要融合的key
|
|
|
+ UpdateFusion *updateFusionInfo
|
|
|
+ UpdateRecord *updateRecordInfo //更新池
|
|
|
+ siteJsonData map[string]string //站点池
|
|
|
+ esIndex,esType string //索引-类型
|
|
|
+ mgo_pool,es_pool int
|
|
|
+ updatelock sync.Mutex
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
func initMgoAndSite() {
|
|
|
- mconf := sysconfig["mongodb"].(map[string]interface{})
|
|
|
- log.Println(mconf)
|
|
|
+ mgoconf := sysconfig["mongodb"].(map[string]interface{})
|
|
|
mgo = &MongodbSim{
|
|
|
- MongodbAddr: mconf["addrName"].(string),
|
|
|
- DbName: mconf["dbName"].(string),
|
|
|
- Size: qu.IntAllDef(mconf["pool"], 10),
|
|
|
+ MongodbAddr: mgoconf["addrName"].(string),
|
|
|
+ DbName: mgoconf["dbName"].(string),
|
|
|
+ Size: qu.IntAllDef(mgoconf["pool"], 10),
|
|
|
}
|
|
|
mgo.InitPool()
|
|
|
|
|
|
|
|
|
- coll_name = mconf["collName"].(string)
|
|
|
+ coll_name = mgoconf["collName"].(string)
|
|
|
+ mgo_pool = qu.IntAllDef(mgoconf["mgo_pool"], 3)
|
|
|
fusion_coll_name = sysconfig["fusion_coll_name"].(string)
|
|
|
record_coll_name = sysconfig["record_coll_name"].(string)
|
|
|
NoNeedFusionKey = sysconfig["notFusionKey"].(map[string]interface{})
|
|
|
|
|
|
|
|
|
- site := mconf["site"].(map[string]interface{})
|
|
|
+ site := mgoconf["site"].(map[string]interface{})
|
|
|
siteJsonData = make(map[string]string, 0)
|
|
|
start := int(time.Now().Unix())
|
|
|
sess_site := mgo.GetMgoConn()
|
|
|
defer mgo.DestoryMongoConn(sess_site)
|
|
|
res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
|
|
|
for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
|
|
|
- siteJsonData[util.ObjToString(site_dict["site"])] = util.ObjToString(site_dict["sitetype"])
|
|
|
+ siteJsonData[qu.ObjToString(site_dict["site"])] = qu.ObjToString(site_dict["sitetype"])
|
|
|
}
|
|
|
log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(siteJsonData))
|
|
|
}
|
|
|
|
|
|
+func initEs() {
|
|
|
+ //初始化es
|
|
|
+ esconf := sysconfig["es"].(map[string]interface{})
|
|
|
+ addr:=esconf["addr"].(string)
|
|
|
+ size:=qu.IntAllDef(esconf["size"], 50)
|
|
|
+ elastic.InitElasticSize(addr,size)
|
|
|
+ es_pool = qu.IntAllDef(esconf["es_pool"], 10)
|
|
|
+ esIndex = esconf["index"].(string)
|
|
|
+ esType = esconf["type"].(string)
|
|
|
|
|
|
-
|
|
|
+}
|
|
|
|
|
|
func init() {
|
|
|
//加载配置文件
|
|
|
qu.ReadConfig(&sysconfig)
|
|
|
initMgoAndSite()
|
|
|
+ initEs()
|
|
|
|
|
|
//更新池
|
|
|
UpdateFusion = newUpdateFusionPool()
|
|
@@ -76,8 +87,6 @@ func init() {
|
|
|
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
log.Println("采用udp模式")
|
|
|
}
|
|
|
|
|
@@ -93,10 +102,11 @@ func mainT() {
|
|
|
|
|
|
//快速测试使用
|
|
|
func main() {
|
|
|
-
|
|
|
-
|
|
|
- sid := "100000000000000000000000"
|
|
|
- eid := "900000000000000000000000"
|
|
|
+ //0101-0301
|
|
|
+ //sid := "5fedf5800000000000000000"
|
|
|
+ //eid := "603bbe000000000000000000"
|
|
|
+ sid := "1fedf5800000000000000000"
|
|
|
+ eid := "903bbe000000000000000000"
|
|
|
//log.Println(sid, "---", eid)
|
|
|
mapinfo := map[string]interface{}{}
|
|
|
if sid == "" || eid == "" {
|
|
@@ -105,249 +115,10 @@ func main() {
|
|
|
}
|
|
|
mapinfo["gtid"] = sid
|
|
|
mapinfo["lteid"] = eid
|
|
|
- startTask([]byte{}, mapinfo)
|
|
|
+ startTaskFullData([]byte{}, mapinfo) //全量
|
|
|
time.Sleep(99999 * time.Hour)
|
|
|
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-//融合具体方法
|
|
|
-func startTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
- log.Println("开始融合流程")
|
|
|
- defer qu.Catch()
|
|
|
- //区间id
|
|
|
- q := map[string]interface{}{
|
|
|
- "_id": map[string]interface{}{
|
|
|
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
- },
|
|
|
- }
|
|
|
- log.Println("查询条件:",q)
|
|
|
- sess := mgo.GetMgoConn()
|
|
|
- defer mgo.DestoryMongoConn(sess)
|
|
|
- it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
|
- //编译不同的融合组,如何划分组
|
|
|
- /***********************/
|
|
|
- /***********************/
|
|
|
- /***y
|
|
|
- ********************/
|
|
|
- /***********************/
|
|
|
- fusionDataGroupArr := make([][]string,0) //待融合组
|
|
|
- addOrUpdateArr := make([]bool,0) //新增-bool-记录-组新增,组更新
|
|
|
- infoFusionArr := make([]map[string]interface{},0) //记录取融合表的数据
|
|
|
-
|
|
|
- repeatArr,sourceArr,index := make([]string,0),make([]string,0),0 //重复数据组
|
|
|
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
|
- if index%1000 == 0 {
|
|
|
- log.Println("current index",index,tmp["_id"])
|
|
|
- }
|
|
|
- tmpId:=BsonTOStringId(tmp["_id"])
|
|
|
- repeat:=qu.IntAll(tmp["repeat"])
|
|
|
- sourceid:=qu.ObjToString(tmp["repeat_id"])
|
|
|
- if repeat==1 {
|
|
|
- repeatArr = append(repeatArr,tmpId)
|
|
|
- sourceArr = append(sourceArr,sourceid)
|
|
|
- }else {
|
|
|
- fusionDataGroupArr = append(fusionDataGroupArr,[]string{tmpId})
|
|
|
- addOrUpdateArr = append(addOrUpdateArr,false)
|
|
|
- infoFusionArr = append(infoFusionArr, map[string]interface{}{})
|
|
|
- }
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- }
|
|
|
-
|
|
|
- log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
|
|
|
-
|
|
|
- //根据重复组,重新划分新的组别
|
|
|
- for i:=0;i<len(repeatArr);i++ {
|
|
|
- sourceid := sourceArr[i]
|
|
|
- isAddExist,index := false,0
|
|
|
- //根据原sourceid 直接遍历组
|
|
|
- R: for k,v:=range fusionDataGroupArr{
|
|
|
- for _,v1:=range v{
|
|
|
- if v1==sourceid {
|
|
|
- index = k
|
|
|
- isAddExist = true
|
|
|
- break R
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if isAddExist { //数组截取替换-找到指定
|
|
|
- arr := make([]string,0)
|
|
|
- arr = fusionDataGroupArr[index]
|
|
|
- arr = append(arr,repeatArr[i])//组拼接当前id
|
|
|
- fusionDataGroupArr[index] = arr
|
|
|
- }else {//当前段落未找到-需要查询融合表,,遍历融合表
|
|
|
- arr,fusionTmpData := make([]string,0),make(map[string]interface{},0)
|
|
|
- arr,fusionTmpData = dealWithFindFusionDataArr(sourceid)
|
|
|
-
|
|
|
- arr = append(arr,repeatArr[i])//组拼接当前id
|
|
|
- if len(arr)<1 { //异常错误,新增
|
|
|
- log.Println("... ... 数据异常异常,融合表,当前组均找不到数据",repeatArr[i])
|
|
|
- arr_error := make([]string,0)
|
|
|
- arr_error = append(arr_error,repeatArr[i])//组拼接当前id
|
|
|
- fusionDataGroupArr = append(fusionDataGroupArr,arr_error)
|
|
|
- addOrUpdateArr = append(addOrUpdateArr,false)
|
|
|
- infoFusionArr = append(infoFusionArr, map[string]interface{}{})
|
|
|
- }else { //正常更新
|
|
|
- fusionDataGroupArr = append(fusionDataGroupArr,arr)
|
|
|
- addOrUpdateArr = append(addOrUpdateArr,true)
|
|
|
- infoFusionArr = append(infoFusionArr,fusionTmpData)
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- //不断改变中
|
|
|
- //log.Println("当前分组数量:",len(fusionDataGroupArr))
|
|
|
- }
|
|
|
- log.Println("最终待融合分组数量:",len(fusionDataGroupArr))
|
|
|
- log.Println("********************分割线********************")
|
|
|
- log.Println("********************分割线********************")
|
|
|
- log.Println("********************分割线********************")
|
|
|
- log.Println("开始处理新增分组... ...")
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- start := int(time.Now().Unix())
|
|
|
-
|
|
|
- //多线程 - 处理数据
|
|
|
- pool := make(chan bool, 3)
|
|
|
- wg := &sync.WaitGroup{}
|
|
|
-
|
|
|
- for i:=0;i<len(fusionDataGroupArr);i++ {
|
|
|
- fusionArr := fusionDataGroupArr[i]
|
|
|
- pool <- true
|
|
|
- wg.Add(1)
|
|
|
- go func(fusionArr []string,i int) {
|
|
|
- defer func() {
|
|
|
- <-pool
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
- //构建数据
|
|
|
- log.Println("构建第",i+1,"组数据...","数量:",len(fusionArr),fusionArr)
|
|
|
- weight :=NewWeightData(fusionArr)
|
|
|
- ////整理数据-筛选排名,模板
|
|
|
- weight.analyzeBuildStandardData()
|
|
|
-
|
|
|
- if len(fusionArr)<=1 {
|
|
|
- //log.Println("单组生成... ...")
|
|
|
- saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
|
|
|
- saveid:=mgo.Save(fusion_coll_name,saveFusionData)
|
|
|
- saveRecordData["_id"] = saveid
|
|
|
- mgo.Save(record_coll_name,saveRecordData)
|
|
|
- }else {
|
|
|
- if addOrUpdateArr[i] {
|
|
|
- //log.Println("多组更新... ...")
|
|
|
- tmpdata:=infoFusionArr[i]
|
|
|
- updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
|
|
|
-
|
|
|
- UpdateFusion.updatePool <- []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "_id": tmpdata["_id"],
|
|
|
- },
|
|
|
- updateFusionData,
|
|
|
- }
|
|
|
- UpdateRecord.updatePool <- []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "_id": tmpdata["_id"],
|
|
|
- },
|
|
|
- updateRecordData,
|
|
|
- }
|
|
|
- }else {
|
|
|
- //log.Println("多组生成... ...")
|
|
|
- saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
|
|
|
- saveid:=mgo.Save(fusion_coll_name,saveFusionData)
|
|
|
- saveRecordData["_id"] = saveid
|
|
|
- mgo.Save(record_coll_name,saveRecordData)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- }(fusionArr,i)
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- wg.Wait()
|
|
|
-
|
|
|
- log.Println("fusion is over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
|
|
|
- log.Println("睡眠30秒,然后在发广播")
|
|
|
- time.Sleep(30 * time.Second)
|
|
|
- //任务完成,开始发送广播通知下面节点
|
|
|
- taskSendFusionUdp(mapInfo)
|
|
|
-
|
|
|
-}
|
|
|
-
|
|
|
-//查询融合表数据-找到对应组id
|
|
|
-func dealWithFindFusionDataArr(sourceid string) ([]string,map[string]interface{}) {
|
|
|
- newArr ,arr := make([]string,0),make(primitive.A,0)
|
|
|
- tmpData:=make(map[string]interface{},0)
|
|
|
- q := map[string]interface{}{}
|
|
|
- sess := mgo.GetMgoConn()
|
|
|
- defer mgo.DestoryMongoConn(sess)
|
|
|
- it := sess.DB(mgo.DbName).C(fusion_coll_name).Find(&q).Iter()
|
|
|
-
|
|
|
- for tmp := make(map[string]interface{}); it.Next(&tmp); {
|
|
|
- //log.Println(reflect.TypeOf(tmp["fusion_allids"]))
|
|
|
- if fusion_allids,b := tmp["fusion_allids"].(primitive.A);b {
|
|
|
- for _,v:=range fusion_allids {
|
|
|
- if v==sourceid {
|
|
|
- //找到目标组-
|
|
|
- arr = fusion_allids
|
|
|
- tmpData = tmp
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- }
|
|
|
-
|
|
|
- for _,v:=range arr{
|
|
|
- newArr = append(newArr,qu.ObjToString(v))
|
|
|
- }
|
|
|
-
|
|
|
- return newArr,tmpData
|
|
|
-}
|
|
|
-
|
|
|
-//查询记录1表数据-找到对应的id , 更新用到
|
|
|
-func dealWithFindRecordData(sourceid string) string {
|
|
|
- newArr ,arr := make([]string,0),make(primitive.A,0)
|
|
|
- //tmpData:=make(map[string]interface{},0)
|
|
|
- q := map[string]interface{}{}
|
|
|
- sess := mgo.GetMgoConn()
|
|
|
- defer mgo.DestoryMongoConn(sess)
|
|
|
- it := sess.DB(mgo.DbName).C(fusion_coll_name).Find(&q).Iter()
|
|
|
-
|
|
|
- for tmp := make(map[string]interface{}); it.Next(&tmp); {
|
|
|
- //log.Println(reflect.TypeOf(tmp["fusion_allids"]))
|
|
|
- if fusion_allids,b := tmp["fusion_allids"].(primitive.A);b {
|
|
|
- for _,v:=range fusion_allids {
|
|
|
- if v==sourceid {
|
|
|
- //找到目标组-
|
|
|
- arr = fusion_allids
|
|
|
- //tmpData = tmp
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- }
|
|
|
-
|
|
|
- for _,v:=range arr{
|
|
|
- newArr = append(newArr,qu.ObjToString(v))
|
|
|
- }
|
|
|
-
|
|
|
- return ""
|
|
|
-}
|
|
|
-
|
|
|
//udp 监听
|
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
switch act {
|
|
@@ -361,7 +132,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
} else if mapInfo != nil {
|
|
|
taskType := qu.ObjToString(mapInfo["stype"])
|
|
|
if taskType == "fusion" {
|
|
|
- go startTask(data, mapInfo)
|
|
|
+ go startTaskFullData(data, mapInfo)
|
|
|
} else {
|
|
|
log.Println("未知类型:融合异常... ...")
|
|
|
}
|
|
@@ -387,16 +158,16 @@ func taskSendFusionUdp(mapinfo map[string]interface{}) {
|
|
|
for _, to := range nextNode {
|
|
|
sid, _ := mapinfo["gtid"].(string)
|
|
|
eid, _ := mapinfo["lteid"].(string)
|
|
|
- key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
|
|
|
+ key := sid + "-" + eid + "-" + qu.ObjToString(to["stype"])
|
|
|
by, _ := json.Marshal(map[string]interface{}{
|
|
|
"gtid": sid,
|
|
|
"lteid": eid,
|
|
|
- "stype": util.ObjToString(to["stype"]),
|
|
|
+ "stype": qu.ObjToString(to["stype"]),
|
|
|
"key": key,
|
|
|
})
|
|
|
addr := &net.UDPAddr{
|
|
|
IP: net.ParseIP(to["addr"].(string)),
|
|
|
- Port: util.IntAll(to["port"]),
|
|
|
+ Port: qu.IntAll(to["port"]),
|
|
|
}
|
|
|
node := &udpNode{by, addr, time.Now().Unix(), 0}
|
|
|
udptaskmap.Store(key, node)
|