package main import ( "encoding/json" "log" qu "qfw/util" "qfw/util/elastic" "strings" "sync" "time" es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1" ) func startFusionData() { start := int(time.Now().Unix()) log.Println("开始遍历索引-进行融合............") //遍历索引 esclient := elastic.GetEsConn() defer elastic.DestoryEsConn(esclient) if esclient == nil { log.Fatalln("连接池异常") } q :=es_elastic.NewBoolQuery() cursor, err := esclient.Scan(esIndex).Query(es_elastic.NewBoolQuery().Must(q)). Size(200).Do() if err != nil { log.Fatal("cursor",err) } if cursor.Results == nil { log.Fatalf("results != nil; got nil") } if cursor.Results.Hits == nil { log.Fatalf("expected results.Hits != nil; got nil") } log.Println("查询正常-总数:",cursor.TotalHits()) //多线程 - 处理数据 pool_mgo := make(chan bool, mgo_pool) wg_mgo := &sync.WaitGroup{} pages,numDocs := 0,0 for { searchResult, err := cursor.Next() if err != nil { if err.Error() == "EOS" { break }else { log.Fatal("cursor searchResult",err) } } pages++ isLog := false for _, hit := range searchResult.Hits.Hits { tmp := make(map[string]interface{}) err := json.Unmarshal(*hit.Source, &tmp) if err != nil { log.Println("json Unmarshal error") continue } if !isLog && numDocs%10000==0 { log.Println("当前条数:", numDocs, "Es数据:", tmp["_id"]) isLog = true } numDocs++ fusion_ids := qu.ObjToString(tmp["allids"]) fusionArr := strings.Split(fusion_ids, ",") sourceid := qu.ObjToString(tmp["_id"]) pool_mgo <- true wg_mgo.Add(1) go func(sourceid string, fusionArr []string) { defer func() { <-pool_mgo wg_mgo.Done() }() weight := NewWeightData(fusionArr) weight.analyzeBuildStandardData() if len(fusionArr) <= 1 { //单组数据 saveFusionData, saveRecordData := weight.dealWithAddFusionStruct() saveid := mgo.Save(fusion_coll_name, saveFusionData) //新增-Record 批量新增-经测试-批量新增与多线程新增 速度306s-236s 相差20%的耗时 saveRecordData["_id"] = saveid UpdateRecord.add_pool <- saveRecordData //批量更新Es -问题耗时 fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"]) updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";` updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"` elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2) UpdateElastic.update_pool <- map[string]string{ "id":sourceid, "updateStr":updateStr1+updateStr2, } }else { saveFusionData, saveRecordData := weight.dealWithMultipleAddFusionStruct() saveid := mgo.Save(fusion_coll_name, saveFusionData) //新增-Record saveRecordData["_id"] = saveid UpdateRecord.add_pool <- saveRecordData //批量新增 //批量更新Es - fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"]) updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";` updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"` elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2) UpdateElastic.update_pool <- map[string]string{ "id":sourceid, "updateStr":updateStr1+updateStr2, } } }(sourceid, fusionArr) } } log.Println("遍历Es结束......") wg_mgo.Wait() log.Println("fusion is over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒") } func goUpdateEs(sourceid string,updateStr string) { UpdateElastic.update_pool <- map[string]string{ "id":sourceid, "updateStr":updateStr, } } func startTaskFullData(data []byte, mapInfo map[string]interface{}) { startFusionData() return log.Println("开始全量融合流程") defer qu.Catch() //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(mapInfo["gtid"].(string)), "$lte": StringTOBsonId(mapInfo["lteid"].(string)), }, } log.Println("查询条件:",q) sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter() index,isOK:=0,0 start := int(time.Now().Unix()) //多线程升索引 pool_es := make(chan bool, es_pool) wg_es := &sync.WaitGroup{} tmpEsMap := make(map[string]string,0) for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { //每遍历isgroupfn条 划分组别 if index%isgroupfn==0 && index!=0 { log.Println("current index",index,tmp["_id"]) //新的一组执行上一组生索引 for k,v:=range tmpEsMap { pool_es <- true wg_es.Add(1) go func(es_id string,cur_ids string) { defer func() { <-pool_es wg_es.Done() }() if es_id!="" && cur_ids!="" { dataArr := *elastic.GetById(esIndex,esType,es_id) if len(dataArr)>0 { //存在-更新 allids := qu.ObjToString(dataArr[0]["allids"]) allids = allids+","+cur_ids updateStr := `ctx._source.allids=`+ `"`+allids+`"` elastic.Update(esIndex,esType,es_id, updateStr) }else { //不存在-新增 savetmp := make(map[string]interface{}, 0) savetmp["allids"] = cur_ids savetmp["_id"] = StringTOBsonId(es_id) savetmp["template_id"] = "" savetmp["fusion_id"] = "" elastic.Save(esIndex, esType, savetmp) } }else { log.Println("异常",es_id,cur_ids) } }(k,v) } wg_es.Wait() tmpEsMap = make(map[string]string,0) } repeat := qu.IntAll(tmp["repeat"]) sourceid := BsonTOStringId(tmp["_id"]) repeatid := BsonTOStringId(tmp["_id"]) if repeat==1 { sourceid = qu.ObjToString(tmp["repeat_id"]) }else { isOK++ } if tmpEsMap[sourceid]!="" { ids := tmpEsMap[sourceid] ids = ids+","+repeatid tmpEsMap[sourceid] = ids }else { tmpEsMap[sourceid] = repeatid } tmp = make(map[string]interface{}) } log.Println("task first:",index,"不重复数:",isOK,"遍历分组数据用时:",int(time.Now().Unix())-start,"秒") //处理剩余数据 if len(tmpEsMap)>0 { log.Println("处理剩余数据:",len(tmpEsMap)) for k,v:=range tmpEsMap { pool_es <- true wg_es.Add(1) go func(es_id string,cur_ids string) { defer func() { <-pool_es wg_es.Done() }() if es_id!="" && cur_ids!="" { dataArr := *elastic.GetById(esIndex,esType,es_id) if len(dataArr)>0 { //存在-更新 allids := qu.ObjToString(dataArr[0]["allids"]) allids = allids+","+cur_ids updateStr := `ctx._source.allids=`+ `"`+allids+`"` elastic.Update(esIndex,esType,es_id, updateStr) }else { //不存在-新增 savetmp := make(map[string]interface{}, 0) savetmp["allids"] = cur_ids savetmp["_id"] = StringTOBsonId(es_id) savetmp["template_id"] = "" savetmp["fusion_id"] = "" elastic.Save(esIndex, esType, savetmp) } }else { log.Println("异常",es_id,cur_ids) } }(k,v) } wg_es.Wait() tmpEsMap = make(map[string]string,0) } log.Println("索引准备完毕睡眠30s......耗时:",int(time.Now().Unix())-start,"秒") time.Sleep(30 * time.Second) //具体融合数据的方法 startFusionData() log.Println("睡眠30秒,然后在发广播") time.Sleep(30 * time.Second) taskSendFusionUdp(mapInfo) }