123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- package main
- import (
- "encoding/json"
- "log"
- qu "qfw/util"
- "qfw/util/elastic"
- "strings"
- "sync"
- "time"
- es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1"
- )
- func startFusionData() {
- start := int(time.Now().Unix())
- log.Println("开始遍历索引-进行融合............")
- //遍历索引
- esclient := elastic.GetEsConn()
- defer elastic.DestoryEsConn(esclient)
- if esclient == nil {
- log.Fatalln("连接池异常")
- }
- q :=es_elastic.NewBoolQuery()
- cursor, err := esclient.Scan(esIndex).Query(es_elastic.NewBoolQuery().Must(q)).
- Size(200).Do()
- if err != nil {
- log.Fatal("cursor",err)
- }
- if cursor.Results == nil {
- log.Fatalf("results != nil; got nil")
- }
- if cursor.Results.Hits == nil {
- log.Fatalf("expected results.Hits != nil; got nil")
- }
- log.Println("查询正常-总数:",cursor.TotalHits())
- //多线程 - 处理数据
- pool_mgo := make(chan bool, mgo_pool)
- wg_mgo := &sync.WaitGroup{}
- pages,numDocs := 0,0
- for {
- searchResult, err := cursor.Next()
- if err != nil {
- if err.Error() == "EOS" {
- break
- }else {
- log.Fatal("cursor searchResult",err)
- }
- }
- pages++
- isLog := false
- for _, hit := range searchResult.Hits.Hits {
- tmp := make(map[string]interface{})
- err := json.Unmarshal(*hit.Source, &tmp)
- if err != nil {
- log.Println("json Unmarshal error")
- continue
- }
- if !isLog && numDocs%10000==0 {
- log.Println("当前条数:", numDocs, "Es数据:", tmp["_id"])
- isLog = true
- }
- numDocs++
- fusion_ids := qu.ObjToString(tmp["allids"])
- fusionArr := strings.Split(fusion_ids, ",")
- sourceid := qu.ObjToString(tmp["_id"])
- pool_mgo <- true
- wg_mgo.Add(1)
- go func(sourceid string, fusionArr []string) {
- defer func() {
- <-pool_mgo
- wg_mgo.Done()
- }()
- weight := NewWeightData(fusionArr)
- weight.analyzeBuildStandardData()
- if len(fusionArr) <= 1 { //单组数据
- saveFusionData, saveRecordData := weight.dealWithAddFusionStruct()
- saveid := mgo.Save(fusion_coll_name, saveFusionData)
- //新增-Record 批量新增-经测试-批量新增与多线程新增 速度306s-236s 相差20%的耗时
- saveRecordData["_id"] = saveid
- UpdateRecord.add_pool <- saveRecordData
- //批量更新Es -问题耗时
- fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
- updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
- updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
- elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
- UpdateElastic.update_pool <- map[string]string{
- "id":sourceid,
- "updateStr":updateStr1+updateStr2,
- }
- }else {
- saveFusionData, saveRecordData := weight.dealWithMultipleAddFusionStruct()
- saveid := mgo.Save(fusion_coll_name, saveFusionData)
- //新增-Record
- saveRecordData["_id"] = saveid
- UpdateRecord.add_pool <- saveRecordData //批量新增
- //批量更新Es -
- fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
- updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
- updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
- elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
- UpdateElastic.update_pool <- map[string]string{
- "id":sourceid,
- "updateStr":updateStr1+updateStr2,
- }
- }
- }(sourceid, fusionArr)
- }
- }
- log.Println("遍历Es结束......")
- wg_mgo.Wait()
- log.Println("fusion is over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒")
- }
- func goUpdateEs(sourceid string,updateStr string) {
- UpdateElastic.update_pool <- map[string]string{
- "id":sourceid,
- "updateStr":updateStr,
- }
- }
- func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
- startFusionData()
- return
- log.Println("开始全量融合流程")
- defer qu.Catch()
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- log.Println("查询条件:",q)
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
- index,isOK:=0,0
- start := int(time.Now().Unix())
- //多线程升索引
- pool_es := make(chan bool, es_pool)
- wg_es := &sync.WaitGroup{}
- tmpEsMap := make(map[string]string,0)
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- //每遍历isgroupfn条 划分组别
- if index%isgroupfn==0 && index!=0 {
- log.Println("current index",index,tmp["_id"])
- //新的一组执行上一组生索引
- for k,v:=range tmpEsMap {
- pool_es <- true
- wg_es.Add(1)
- go func(es_id string,cur_ids string) {
- defer func() {
- <-pool_es
- wg_es.Done()
- }()
- if es_id!="" && cur_ids!="" {
- dataArr := *elastic.GetById(esIndex,esType,es_id)
- if len(dataArr)>0 { //存在-更新
- allids := qu.ObjToString(dataArr[0]["allids"])
- allids = allids+","+cur_ids
- updateStr := `ctx._source.allids=`+ `"`+allids+`"`
- elastic.Update(esIndex,esType,es_id, updateStr)
- }else { //不存在-新增
- savetmp := make(map[string]interface{}, 0)
- savetmp["allids"] = cur_ids
- savetmp["_id"] = StringTOBsonId(es_id)
- savetmp["template_id"] = ""
- savetmp["fusion_id"] = ""
- elastic.Save(esIndex, esType, savetmp)
- }
- }else {
- log.Println("异常",es_id,cur_ids)
- }
- }(k,v)
- }
- wg_es.Wait()
- tmpEsMap = make(map[string]string,0)
- }
- repeat := qu.IntAll(tmp["repeat"])
- sourceid := BsonTOStringId(tmp["_id"])
- repeatid := BsonTOStringId(tmp["_id"])
- if repeat==1 {
- sourceid = qu.ObjToString(tmp["repeat_id"])
- }else {
- isOK++
- }
- if tmpEsMap[sourceid]!="" {
- ids := tmpEsMap[sourceid]
- ids = ids+","+repeatid
- tmpEsMap[sourceid] = ids
- }else {
- tmpEsMap[sourceid] = repeatid
- }
- tmp = make(map[string]interface{})
- }
- log.Println("task first:",index,"不重复数:",isOK,"遍历分组数据用时:",int(time.Now().Unix())-start,"秒")
- //处理剩余数据
- if len(tmpEsMap)>0 {
- log.Println("处理剩余数据:",len(tmpEsMap))
- for k,v:=range tmpEsMap {
- pool_es <- true
- wg_es.Add(1)
- go func(es_id string,cur_ids string) {
- defer func() {
- <-pool_es
- wg_es.Done()
- }()
- if es_id!="" && cur_ids!="" {
- dataArr := *elastic.GetById(esIndex,esType,es_id)
- if len(dataArr)>0 { //存在-更新
- allids := qu.ObjToString(dataArr[0]["allids"])
- allids = allids+","+cur_ids
- updateStr := `ctx._source.allids=`+ `"`+allids+`"`
- elastic.Update(esIndex,esType,es_id, updateStr)
- }else { //不存在-新增
- savetmp := make(map[string]interface{}, 0)
- savetmp["allids"] = cur_ids
- savetmp["_id"] = StringTOBsonId(es_id)
- savetmp["template_id"] = ""
- savetmp["fusion_id"] = ""
- elastic.Save(esIndex, esType, savetmp)
- }
- }else {
- log.Println("异常",es_id,cur_ids)
- }
- }(k,v)
- }
- wg_es.Wait()
- tmpEsMap = make(map[string]string,0)
- }
- log.Println("索引准备完毕睡眠30s......耗时:",int(time.Now().Unix())-start,"秒")
- time.Sleep(30 * time.Second)
- //具体融合数据的方法
- startFusionData()
- log.Println("睡眠30秒,然后在发广播")
- time.Sleep(30 * time.Second)
- taskSendFusionUdp(mapInfo)
- }
|