package main import ( "log" qu "qfw/util" "strings" "sync" "time" ) //增量-融合一小段 func startTaskAddData(data []byte, mapInfo map[string]interface{}) { log.Println("开始增量融合流程") defer qu.Catch() //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(mapInfo["gtid"].(string)), "$lte": StringTOBsonId(mapInfo["lteid"].(string)), }, } log.Println("查询条件:",q) sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter() start := int(time.Now().Unix()) tmpFusionMap := make(map[string]string,0) index,isOK:=0,0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if index%10000 == 0 { log.Println("current index",index,tmp["_id"]) } repeat := qu.IntAll(tmp["repeat"]) repeatid,sourceid := BsonTOStringId(tmp["_id"]),BsonTOStringId(tmp["_id"]) if repeat==1 { sourceid = qu.ObjToString(tmp["repeat_id"]) } if sourceid!="" { if tmpFusionMap[sourceid]!="" { ids := tmpFusionMap[sourceid] ids = ids+","+repeatid tmpFusionMap[sourceid] = ids }else { tmpFusionMap[sourceid] = repeatid } }else { //log.Println("异常: sourceid 为空 ") } tmp = make(map[string]interface{}) } log.Println("task mongo first:",index,len(tmpFusionMap),"开始确认最后增量组数据...") isUpdateMap:=make(map[string]string) for sourceid,value:=range tmpFusionMap { isOK++ if isOK % 1000 ==0 { log.Println("当前数量:",isOK,sourceid) } data:=mgo.FindById(group_coll_name,sourceid) if data!=nil && len(data)>2 { //更新组 allids:= qu.ObjToString(data["allids"]) tmpFusionMap[sourceid] = allids+","+value isUpdateMap[sourceid] = qu.ObjToString(data["fusion_id"]) } } log.Println("分组数据总用时:",int(time.Now().Unix())-start,"秒") stareFusionData(tmpFusionMap,isUpdateMap) log.Println("此段落结束,发送udp","...睡眠30s") time.Sleep(30 * time.Second) taskSendFusionUdp(mapInfo) } func stareFusionData(tmpFusionMap map[string]string,isUpdateMap map[string]string) { //根据重复组,重新划分新的组别 log.Println("开始融合操作......") index,start :=0, int(time.Now().Unix()) //多线程保存数据 pool_mgo := make(chan bool, mgo_pool) wg_mgo := &sync.WaitGroup{} for sourceid,v:=range tmpFusionMap { fusionArr := strings.Split(v, ",") fusionid := "" if isUpdateMap[sourceid]!="" { fusionid = qu.ObjToString(isUpdateMap[sourceid]) } pool_mgo <- true wg_mgo.Add(1) go func(sourceid string, fusionArr []string,fusionid string) { defer func() { <-pool_mgo wg_mgo.Done() }() weight := NewWeightData(fusionArr,sourceid) weight.analyzeBuildStandardData() saveFusionData, saveRecordData:= map[string]interface{}{},map[string]interface{}{} if fusionid!="" {//更新 saveFusionData, saveRecordData:=weight.dealWithMultipleUpdateFusionStruct(fusionid) //更新融合表 UpdateFusionPool.pool <- []map[string]interface{}{ map[string]interface{}{ "_id": StringTOBsonId(fusionid), }, saveFusionData, } //更新日志表 UpdateRecordPool.pool <- []map[string]interface{}{ map[string]interface{}{ "_id": StringTOBsonId(fusionid), }, map[string]interface{}{ "$set": saveRecordData, }, } //更新分组表 UpdateGroupPool.pool <- []map[string]interface{}{ map[string]interface{}{ "_id": StringTOBsonId(sourceid), }, map[string]interface{}{ "$set": map[string]interface{}{ "allids":strings.Join(fusionArr, ","), }, }, } }else { if len(fusionArr) <= 1 { saveFusionData, saveRecordData = weight.dealWithAddFusionStruct() }else { saveFusionData, saveRecordData = weight.dealWithMultipleAddFusionStruct() } //新增融合表 saveid := mgo.Save(fusion_coll_name, saveFusionData) saveRecordData["_id"] = saveid //批量新增日志表 AddRecordPool.pool <- saveRecordData //批量新增分组表 AddGroupPool.pool <- map[string]interface{}{ "_id":StringTOBsonId(sourceid), "allids":strings.Join(fusionArr, ","), "fusion_id": BsonTOStringId(saveid), "template_id":qu.ObjToString(saveFusionData["fusion_templateid"]), } } }(sourceid, fusionArr,fusionid) } wg_mgo.Wait() log.Println("add fusion is over:",index,"总用时:",int(time.Now().Unix())-start,"秒") }