123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417 |
- package main
- import (
- "encoding/json"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "log"
- mu "mfw/util"
- "net"
- "os"
- "qfw/common/src/qfw/util"
- qu "qfw/util"
- "strconv"
- "sync"
- "time"
- )
- var (
- sysconfig map[string]interface{} //配置文件
- mgo *MongodbSim //mongodb操作对象
- udpclient mu.UdpClient //udp对象
- nextNode []map[string]interface{} //下节点数组
- coll_name string
- fusion_coll_name string
- record_coll_name string //表名
- NoNeedFusionKey map[string]interface{} //不需要融合的key
- UpdateFusion *updateFusionInfo
- UpdateRecord *updateRecordInfo //更新池
- )
- func initMgo() {
- mconf := sysconfig["mongodb"].(map[string]interface{})
- log.Println(mconf)
- mgo = &MongodbSim{
- MongodbAddr: mconf["addrName"].(string),
- DbName: mconf["dbName"].(string),
- Size: qu.IntAllDef(mconf["pool"], 10),
- }
- mgo.InitPool()
- coll_name = mconf["collName"].(string)
- fusion_coll_name = sysconfig["fusion_coll_name"].(string)
- record_coll_name = sysconfig["record_coll_name"].(string)
- NoNeedFusionKey = sysconfig["notFusionKey"].(map[string]interface{})
- }
- func init() {
- //加载配置文件
- qu.ReadConfig(&sysconfig)
- initMgo()
- //更新池
- UpdateFusion = newUpdateFusionPool()
- go UpdateFusion.updateFusionData()
- UpdateRecord = newUpdateRecordPool()
- go UpdateRecord.updateRecordData()
- log.Println("采用udp模式")
- }
- func mainT() {
- go checkMapJob()
- updport := sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- time.Sleep(99999 * time.Hour)
- }
- //快速测试使用
- func main() {
- sid := "100000000000000000000000"
- eid := "900000000000000000000000"
- //log.Println(sid, "---", eid)
- mapinfo := map[string]interface{}{}
- if sid == "" || eid == "" {
- log.Println("sid,eid参数不能为空")
- os.Exit(0)
- }
- mapinfo["gtid"] = sid
- mapinfo["lteid"] = eid
- startTask([]byte{}, mapinfo)
- time.Sleep(99999 * time.Hour)
- }
- //融合具体方法
- func startTask(data []byte, mapInfo map[string]interface{}) {
- log.Println("开始融合流程")
- defer qu.Catch()
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- log.Println("查询条件:",q)
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
- //编译不同的融合组,如何划分组
- /***********************/
- /***********************/
- /***y
- ********************/
- /***********************/
- fusionDataGroupArr := make([][]string,0) //待融合组
- addOrUpdateArr := make([]bool,0) //新增-bool-记录-组新增,组更新
- infoFusionArr := make([]map[string]interface{},0) //记录取融合表的数据
- repeatArr,sourceArr,index := make([]string,0),make([]string,0),0 //重复数据组
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- if index%1000 == 0 {
- log.Println("current index",index,tmp["_id"])
- }
- tmpId:=BsonTOStringId(tmp["_id"])
- repeat:=qu.IntAll(tmp["repeat"])
- sourceid:=qu.ObjToString(tmp["repeat_id"])
- if repeat==1 {
- repeatArr = append(repeatArr,tmpId)
- sourceArr = append(sourceArr,sourceid)
- }else {
- fusionDataGroupArr = append(fusionDataGroupArr,[]string{tmpId})
- addOrUpdateArr = append(addOrUpdateArr,false)
- infoFusionArr = append(infoFusionArr, map[string]interface{}{})
- }
- tmp = make(map[string]interface{})
- }
- log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
- //根据重复组,重新划分新的组别
- for i:=0;i<len(repeatArr);i++ {
- sourceid := sourceArr[i]
- isAddExist,index := false,0
- //根据原sourceid 直接遍历组
- R: for k,v:=range fusionDataGroupArr{
- for _,v1:=range v{
- if v1==sourceid {
- index = k
- isAddExist = true
- break R
- }
- }
- }
- if isAddExist { //数组截取替换-找到指定
- arr := make([]string,0)
- arr = fusionDataGroupArr[index]
- arr = append(arr,repeatArr[i])//组拼接当前id
- fusionDataGroupArr[index] = arr
- }else {//当前段落未找到-需要查询融合表,,遍历融合表
- arr,fusionTmpData := make([]string,0),make(map[string]interface{},0)
- arr,fusionTmpData = dealWithFindFusionDataArr(sourceid)
- arr = append(arr,repeatArr[i])//组拼接当前id
- if len(arr)<1 { //异常错误,新增
- log.Println("... ... 数据异常异常,融合表,当前组均找不到数据",repeatArr[i])
- arr_error := make([]string,0)
- arr_error = append(arr_error,repeatArr[i])//组拼接当前id
- fusionDataGroupArr = append(fusionDataGroupArr,arr_error)
- addOrUpdateArr = append(addOrUpdateArr,false)
- infoFusionArr = append(infoFusionArr, map[string]interface{}{})
- }else { //正常更新
- fusionDataGroupArr = append(fusionDataGroupArr,arr)
- addOrUpdateArr = append(addOrUpdateArr,true)
- infoFusionArr = append(infoFusionArr,fusionTmpData)
- }
- }
- //不断改变中
- //log.Println("当前分组数量:",len(fusionDataGroupArr))
- }
- log.Println("最终待融合分组数量:",len(fusionDataGroupArr))
- log.Println("********************分割线********************")
- log.Println("********************分割线********************")
- log.Println("********************分割线********************")
- log.Println("开始处理新增分组... ...")
- start := int(time.Now().Unix())
- //进行分组融合
- pool := make(chan bool, 1)
- wg := &sync.WaitGroup{}
- for i:=0;i<len(fusionDataGroupArr);i++ {
- fusionArr := fusionDataGroupArr[i]
- //构建数据
- log.Println("构建第",i+1,"组数据...","数量:",len(fusionArr),fusionArr)
- //多线程 - 处理数据
- pool <- true
- wg.Add(1)
- go func(fusionArr []string,i int) {
- defer func() {
- <-pool
- wg.Done()
- }()
- weight :=NewWeightData(fusionArr)
- ////整理数据-筛选排名,模板
- weight.analyzeBuildStandardData()
- if len(fusionArr)<=1 {
- //log.Println("单组生成... ...")
- saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
- saveid:=mgo.Save(fusion_coll_name,saveFusionData)
- saveRecordData["_id"] = saveid
- mgo.Save(record_coll_name,saveRecordData)
- }else {
- if addOrUpdateArr[i] {
- //log.Println("多组更新... ...")
- tmpdata:=infoFusionArr[i]
- updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
- UpdateFusion.updatePool <- []map[string]interface{}{
- map[string]interface{}{
- "_id": tmpdata["_id"],
- },
- updateFusionData,
- }
- UpdateRecord.updatePool <- []map[string]interface{}{
- map[string]interface{}{
- "_id": tmpdata["_id"],
- },
- updateRecordData,
- }
- }else {
- //log.Println("多组生成... ...")
- saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
- saveid:=mgo.Save(fusion_coll_name,saveFusionData)
- saveRecordData["_id"] = saveid
- mgo.Save(record_coll_name,saveRecordData)
- }
- }
- }(fusionArr,i)
- }
- wg.Wait()
- log.Println("新增融合over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
- time.Sleep(30 * time.Second)
- //任务完成,开始发送广播通知下面节点
- taskSendFusionUdp(mapInfo)
- }
- //查询融合表数据-找到对应组id
- func dealWithFindFusionDataArr(sourceid string) ([]string,map[string]interface{}) {
- newArr ,arr := make([]string,0),make(primitive.A,0)
- tmpData:=make(map[string]interface{},0)
- q := map[string]interface{}{}
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- it := sess.DB(mgo.DbName).C(fusion_coll_name).Find(&q).Iter()
- for tmp := make(map[string]interface{}); it.Next(&tmp); {
- //log.Println(reflect.TypeOf(tmp["fusion_allids"]))
- if fusion_allids,b := tmp["fusion_allids"].(primitive.A);b {
- for _,v:=range fusion_allids {
- if v==sourceid {
- //找到目标组-
- arr = fusion_allids
- tmpData = tmp
- tmp = make(map[string]interface{})
- break
- }
- }
- }
- tmp = make(map[string]interface{})
- }
- for _,v:=range arr{
- newArr = append(newArr,qu.ObjToString(v))
- }
- return newArr,tmpData
- }
- //查询记录1表数据-找到对应的id , 更新用到
- func dealWithFindRecordData(sourceid string) string {
- newArr ,arr := make([]string,0),make(primitive.A,0)
- //tmpData:=make(map[string]interface{},0)
- q := map[string]interface{}{}
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- it := sess.DB(mgo.DbName).C(fusion_coll_name).Find(&q).Iter()
- for tmp := make(map[string]interface{}); it.Next(&tmp); {
- //log.Println(reflect.TypeOf(tmp["fusion_allids"]))
- if fusion_allids,b := tmp["fusion_allids"].(primitive.A);b {
- for _,v:=range fusion_allids {
- if v==sourceid {
- //找到目标组-
- arr = fusion_allids
- //tmpData = tmp
- tmp = make(map[string]interface{})
- break
- }
- }
- }
- tmp = make(map[string]interface{})
- }
- for _,v:=range arr{
- newArr = append(newArr,qu.ObjToString(v))
- }
- return ""
- }
- //udp 监听
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA: //上个节点的数据
- //从表中开始处理
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Println("err:", err, "mapInfo:", mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- taskType := qu.ObjToString(mapInfo["stype"])
- if taskType == "fusion" {
- go startTask(data, mapInfo)
- } else {
- log.Println("未知类型:融合异常... ...")
- }
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- }
- case mu.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Println("ok:", ok)
- udptaskmap.Delete(ok)
- }
- }
- }
- //结束发送udp
- func taskSendFusionUdp(mapinfo map[string]interface{}) {
- //log.Println("信息融合结束-发送udp")
- for _, to := range nextNode {
- sid, _ := mapinfo["gtid"].(string)
- eid, _ := mapinfo["lteid"].(string)
- key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": util.ObjToString(to["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: util.IntAll(to["port"]),
- }
- node := &udpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- }
- //判断是否在当前id段落
- func judgeIsCurIds (gtid string,lteid string,curid string) bool {
- gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
- lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
- cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
- if cur_time>gt_time&&cur_time<=lte_time {
- return true
- }
- return false
- }
|