package main import ( "encoding/json" "log" qu "qfw/util" "qfw/util/elastic" "strings" "sync" "time" es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1" ) func startTaskFullData(data []byte, mapInfo map[string]interface{}) { //临时测试 //先导出具体需要融合组数据组-存mongo exportFusionMongoData() time.Sleep(60 * time.Second) //具体融合数据的方法 startFusionData() time.Sleep(60 * time.Second) return log.Println("开始全量融合流程") defer qu.Catch() //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(mapInfo["gtid"].(string)), "$lte": StringTOBsonId(mapInfo["lteid"].(string)), }, } log.Println("查询条件:",q) sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter() index,isOK:=0,0 start := int(time.Now().Unix()) //多线程升索引 pool_es := make(chan bool, es_pool) wg_es := &sync.WaitGroup{} tmpEsMap := make(map[string]string,0) for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { //每遍历isgroupfn条 划分组别 if index%isgroupfn==0 && index!=0 { log.Println("current index",index,tmp["_id"]) //新的一组执行上一组生索引 for k,v:=range tmpEsMap { pool_es <- true wg_es.Add(1) go func(es_id string,cur_ids string) { defer func() { <-pool_es wg_es.Done() }() if es_id!="" && cur_ids!="" { dataArr := *elastic.GetById(esIndex,esType,es_id) if len(dataArr)>0 { //存在-更新 allids := qu.ObjToString(dataArr[0]["allids"]) allids = allids+","+cur_ids updateStr := `ctx._source.allids=`+ `"`+allids+`"` elastic.Update(esIndex,esType,es_id, updateStr) }else { //不存在-新增 savetmp := make(map[string]interface{}, 0) savetmp["allids"] = cur_ids savetmp["_id"] = StringTOBsonId(es_id) savetmp["template_id"] = "" savetmp["fusion_id"] = "" elastic.Save(esIndex, esType, savetmp) } }else { log.Println("异常",es_id,cur_ids) } }(k,v) } wg_es.Wait() tmpEsMap = make(map[string]string,0) } repeat := qu.IntAll(tmp["repeat"]) sourceid := BsonTOStringId(tmp["_id"]) repeatid := BsonTOStringId(tmp["_id"]) if repeat==1 { sourceid = qu.ObjToString(tmp["repeat_id"]) }else { isOK++ } if tmpEsMap[sourceid]!="" { ids := tmpEsMap[sourceid] ids = ids+","+repeatid tmpEsMap[sourceid] = ids }else { tmpEsMap[sourceid] = repeatid } tmp = make(map[string]interface{}) } log.Println("task first:",index,"不重复数:",isOK,"遍历分组数据用时:",int(time.Now().Unix())-start,"秒") //处理剩余数据 if len(tmpEsMap)>0 { log.Println("处理剩余数据:",len(tmpEsMap)) for k,v:=range tmpEsMap { pool_es <- true wg_es.Add(1) go func(es_id string,cur_ids string) { defer func() { <-pool_es wg_es.Done() }() if es_id!="" && cur_ids!="" { dataArr := *elastic.GetById(esIndex,esType,es_id) if len(dataArr)>0 { //存在-更新 allids := qu.ObjToString(dataArr[0]["allids"]) allids = allids+","+cur_ids updateStr := `ctx._source.allids=`+ `"`+allids+`"` elastic.Update(esIndex,esType,es_id, updateStr) }else { //不存在-新增 savetmp := make(map[string]interface{}, 0) savetmp["allids"] = cur_ids savetmp["_id"] = StringTOBsonId(es_id) savetmp["template_id"] = "" savetmp["fusion_id"] = "" elastic.Save(esIndex, esType, savetmp) } }else { log.Println("异常",es_id,cur_ids) } }(k,v) } wg_es.Wait() tmpEsMap = make(map[string]string,0) } log.Println("索引准备完毕睡眠30s......耗时:",int(time.Now().Unix())-start,"秒") time.Sleep(60 * time.Second) //先到处具体需要融合组数据-存mongo exportFusionMongoData() time.Sleep(60 * time.Second) //具体融合数据的方法 startFusionData() time.Sleep(60 * time.Second) taskSendFusionUdp(mapInfo) } func exportFusionMongoData() { start := int(time.Now().Unix()) log.Println("开始导出融合组数据......") //遍历索引 esclient := elastic.GetEsConn() defer elastic.DestoryEsConn(esclient) if esclient == nil { log.Println("连接池异常") } q :=es_elastic.NewBoolQuery() cursor, err := esclient.Scan(esIndex).Query(es_elastic.NewBoolQuery().Must(q)). Size(200).Do() if err != nil { log.Println("cursor",err) } if cursor.Results == nil { log.Println("results != nil; got nil") } if cursor.Results.Hits == nil { log.Println("expected results.Hits != nil; got nil") } log.Println("查询正常-总数:",cursor.TotalHits()) //多线程 - 处理数据 pool_es := make(chan bool, es_pool) wg_es := &sync.WaitGroup{} pages,numDocs := 0,0 for { searchResult, err := cursor.Next() if err != nil { if err.Error() == "EOS" { break }else { log.Println("cursor searchResult",err) } } pages++ isLog := false for _, hit := range searchResult.Hits.Hits { tmp := make(map[string]interface{}) err := json.Unmarshal(*hit.Source, &tmp) if err != nil { log.Println("json Unmarshal error") continue } if !isLog && numDocs%10000==0 { log.Println("当前条数:", numDocs, "Es数据:", tmp["_id"]) isLog = true } numDocs++ fusion_ids := qu.ObjToString(tmp["allids"]) sourceid := qu.ObjToString(tmp["_id"]) pool_es <- true wg_es.Add(1) go func(sourceid string, fusionArr string) { defer func() { <-pool_es wg_es.Done() }() AddGroupPool.pool <- map[string]interface{}{ "_id":StringTOBsonId(sourceid), "allids":fusion_ids, } }(sourceid, fusion_ids) } } log.Println("遍历Es结束......") wg_es.Wait() log.Println("fusion group over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒") } func startFusionData() { log.Println("开始全量融合流程...") defer qu.Catch() //可以开多程序-不同id段执行融合 q := map[string]interface{}{} sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) it := sess.DB(mgo.DbName).C(group_coll_name).Find(&q).Iter() index,start :=0, int(time.Now().Unix()) //多线程保存数据 pool_mgo := make(chan bool, mgo_pool) wg_mgo := &sync.WaitGroup{} for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if index%10000==0 { log.Println("current index",index,tmp["_id"]) } fusion_ids := qu.ObjToString(tmp["allids"]) fusionArr := strings.Split(fusion_ids, ",") sourceid := BsonTOStringId(tmp["_id"]) pool_mgo <- true wg_mgo.Add(1) go func(sourceid string, fusionArr []string) { defer func() { <-pool_mgo wg_mgo.Done() }() weight := NewWeightData(fusionArr) weight.analyzeBuildStandardData() saveFusionData, saveRecordData:= map[string]interface{}{},map[string]interface{}{} if len(fusionArr) <= 1 { saveFusionData, saveRecordData = weight.dealWithAddFusionStruct() }else { saveFusionData, saveRecordData = weight.dealWithMultipleAddFusionStruct() } //新增融合表 saveid := mgo.Save(fusion_coll_name, saveFusionData) saveRecordData["_id"] = saveid //批量新增日志表 AddRecordPool.pool <- saveRecordData //批量更新分组表 UpdateGroupPool.pool <- []map[string]interface{}{ map[string]interface{}{ "_id": StringTOBsonId(sourceid), }, map[string]interface{}{ "$set": map[string]interface{}{ "fusion_id": BsonTOStringId(saveid), "template_id":qu.ObjToString(saveFusionData["fusion_templateid"]), }, }, } }(sourceid, fusionArr) tmp = make(map[string]interface{}) } wg_mgo.Wait() log.Println("fusion is over:",index,"总用时:",int(time.Now().Unix())-start,"秒") } //处理结构数据 func (weight *weightDataMap)dealWithStructData(recordDict *map[string]interface{}) map[string]interface{} { //模板id 数据 templateid:=weight.templateid templateTmp:=weight.data[templateid].data modifyData :=make(map[string]interface{},0) attach_text,isAttach:=make(map[string]interface{},0),false if tmp_arr,b := templateTmp["attach_text"].(map[string]interface{});b { //有值符合- attach_text = tmp_arr } //附件判重-并合并新增 keyIndex := -1 for k,_:=range attach_text { key:=qu.IntAll(k) if key>keyIndex { keyIndex = key } } for _,value_id :=range weight.saveids { if templateid == value_id { continue } rankData := weight.data[value_id].data //具体其他排名数据 if attachData,b := rankData["attach_text"].(map[string]interface{});b { if len(attachData)>0 { //有值 for _,v:=range attachData { //子元素 if attach,isOK := v.(map[string]interface{});isOK { if !dealWithRepeatAttachData(attach_text,attach) { //符合条件-不重复直接添加 keyIndex++ saveKey := fmt.Sprintf("%v",keyIndex) attach_text[saveKey] = attach //key累加 isAttach = true //多条情况-融合 if (*recordDict)["attach_text"]==nil { (*recordDict)["attach_text"] = []map[string]interface{}{ map[string]interface{}{ "id":value_id, "value":attach, }, } }else { arr := (*recordDict)["attach_text"].([]map[string]interface{}) arr = append(arr,map[string]interface{}{ "id":value_id, "value":attach, }) (*recordDict)["attach_text"] = arr } } } } } } } //联系人 winnerorder winnerCount:=qu.IntAll(0) winnerArr,b,isWinner,winnerid:=make(primitive.A,0),false,false,templateid if winnerArr,b = templateTmp["winnerorder"].([]interface{});b { winnerCount = qu.IntAll(len(winnerArr)) } //分包 package packageCount:=qu.IntAll(0) packageArr,b,isPackage,packageid:=make(map[string]interface{},0),false,false,templateid if packageArr,b = templateTmp["package"].(map[string]interface{});b { packageCount = qu.IntAll(len(packageArr)) } //遍历其他数据- for _,value:=range weight.saveids { if templateid == value { continue } //winnerorder tmp:=weight.data[value].data if arr_1,winner_b := tmp["winnerorder"].(primitive.A);winner_b { count:=qu.IntAll(len(arr_1)) if count > winnerCount { winnerCount = count winnerArr = arr_1 isWinner = true winnerid = value } } //package if arr_2,package_b := (tmp["package"]).(map[string]interface{});package_b { count:=qu.IntAll(len(arr_2)) if count > packageCount { packageCount = count packageArr = arr_2 isPackage = true packageid = value } } } //改变的值 if len(winnerArr)>0 && winnerArr!=nil && isWinner { modifyData["winnerorder"] = winnerArr (*recordDict)["winnerorder"] = map[string]interface{}{ "id":winnerid, "value":winnerArr, } } if len(packageArr)>0 && packageArr!=nil && isPackage { modifyData["package"] = packageArr (*recordDict)["package"] = map[string]interface{}{ "id":packageid, "value":packageArr, } } if len(attach_text)>0 && attach_text!=nil && isAttach { modifyData["attach_text"] = attach_text } return modifyData }