123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- package main
- import (
- "log"
- qu "qfw/util"
- "strings"
- "sync"
- "time"
- )
- //增量-融合一小段
- func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
- log.Println("开始增量融合流程")
-
- defer qu.Catch()
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- log.Println("查询条件:",q)
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
- start := int(time.Now().Unix())
- tmpFusionMap := make(map[string]string,0)
- index,isOK:=0,0
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- if index%10000 == 0 {
- log.Println("current index",index,tmp["_id"])
- }
-
- repeat := qu.IntAll(tmp["repeat"])
- repeatid,sourceid := BsonTOStringId(tmp["_id"]),BsonTOStringId(tmp["_id"])
- if repeat==1 {
- sourceid = qu.ObjToString(tmp["repeat_id"])
- }
- if sourceid!="" {
- if tmpFusionMap[sourceid]!="" {
- ids := tmpFusionMap[sourceid]
- ids = ids+","+repeatid
- tmpFusionMap[sourceid] = ids
- }else {
- tmpFusionMap[sourceid] = repeatid
- }
- }else {
- //log.Println("异常: sourceid 为空 ")
- }
- tmp = make(map[string]interface{})
- }
- log.Println("task mongo first:",index,len(tmpFusionMap),"开始确认最后增量组数据...")
- isUpdateMap:=make(map[string]string)
- for sourceid,value:=range tmpFusionMap {
- isOK++
- if isOK % 1000 ==0 {
- log.Println("当前数量:",isOK,sourceid)
- }
- data:=mgo.FindById(group_coll_name,sourceid)
- if data!=nil && len(data)>2 { //更新组
- allids:= qu.ObjToString(data["allids"])
- tmpFusionMap[sourceid] = allids+","+value
- isUpdateMap[sourceid] = qu.ObjToString(data["fusion_id"])
- }
- }
- log.Println("分组数据总用时:",int(time.Now().Unix())-start,"秒")
- stareFusionData(tmpFusionMap,isUpdateMap)
- log.Println("此段落结束,发送udp","...睡眠30s")
- time.Sleep(30 * time.Second)
- taskSendFusionUdp(mapInfo)
- }
- func stareFusionData(tmpFusionMap map[string]string,isUpdateMap map[string]string) {
- //根据重复组,重新划分新的组别
- log.Println("开始融合操作......")
- index,start :=0, int(time.Now().Unix())
- //多线程保存数据
- pool_mgo := make(chan bool, mgo_pool)
- wg_mgo := &sync.WaitGroup{}
- for sourceid,v:=range tmpFusionMap {
- fusionArr := strings.Split(v, ",")
- fusionid := ""
- if isUpdateMap[sourceid]!="" {
- fusionid = qu.ObjToString(isUpdateMap[sourceid])
- }
- pool_mgo <- true
- wg_mgo.Add(1)
- go func(sourceid string, fusionArr []string,fusionid string) {
- defer func() {
- <-pool_mgo
- wg_mgo.Done()
- }()
- weight := NewWeightData(fusionArr,sourceid)
- weight.analyzeBuildStandardData()
- saveFusionData, saveRecordData:= map[string]interface{}{},map[string]interface{}{}
- if fusionid!="" {//更新
- saveFusionData, saveRecordData:=weight.dealWithMultipleUpdateFusionStruct(fusionid)
- //更新融合表
- UpdateFusionPool.pool <- []map[string]interface{}{
- map[string]interface{}{
- "_id": StringTOBsonId(fusionid),
- },
- saveFusionData,
- }
- //更新日志表
- UpdateRecordPool.pool <- []map[string]interface{}{
- map[string]interface{}{
- "_id": StringTOBsonId(fusionid),
- },
- map[string]interface{}{
- "$set": saveRecordData,
- },
- }
- //更新分组表
- UpdateGroupPool.pool <- []map[string]interface{}{
- map[string]interface{}{
- "_id": StringTOBsonId(sourceid),
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "allids":strings.Join(fusionArr, ","),
- },
- },
- }
- }else {
- if len(fusionArr) <= 1 {
- saveFusionData, saveRecordData = weight.dealWithAddFusionStruct()
- }else {
- saveFusionData, saveRecordData = weight.dealWithMultipleAddFusionStruct()
- }
- //新增融合表
- saveid := mgo.Save(fusion_coll_name, saveFusionData)
- saveRecordData["_id"] = saveid
- //批量新增日志表
- AddRecordPool.pool <- saveRecordData
- //批量新增分组表
- AddGroupPool.pool <- map[string]interface{}{
- "_id":StringTOBsonId(sourceid),
- "allids":strings.Join(fusionArr, ","),
- "fusion_id": BsonTOStringId(saveid),
- "template_id":qu.ObjToString(saveFusionData["fusion_templateid"]),
- }
- }
- }(sourceid, fusionArr,fusionid)
- }
- wg_mgo.Wait()
- log.Println("add fusion is over:",index,"总用时:",int(time.Now().Unix())-start,"秒")
- }
|