|
@@ -11,153 +11,17 @@ import (
|
|
|
es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1"
|
|
|
)
|
|
|
|
|
|
-func exportFusionMongoData() {
|
|
|
-
|
|
|
- start := int(time.Now().Unix())
|
|
|
- log.Println("开始导出融合组数据......")
|
|
|
- //遍历索引
|
|
|
- esclient := elastic.GetEsConn()
|
|
|
- defer elastic.DestoryEsConn(esclient)
|
|
|
-
|
|
|
- if esclient == nil {
|
|
|
- log.Println("连接池异常")
|
|
|
- }
|
|
|
- q :=es_elastic.NewBoolQuery()
|
|
|
- cursor, err := esclient.Scan(esIndex).Query(es_elastic.NewBoolQuery().Must(q)).
|
|
|
- Size(200).Do()
|
|
|
- if err != nil {
|
|
|
- log.Println("cursor",err)
|
|
|
- }
|
|
|
- if cursor.Results == nil {
|
|
|
- log.Println("results != nil; got nil")
|
|
|
- }
|
|
|
- if cursor.Results.Hits == nil {
|
|
|
- log.Println("expected results.Hits != nil; got nil")
|
|
|
- }
|
|
|
- log.Println("查询正常-总数:",cursor.TotalHits())
|
|
|
- //多线程 - 处理数据
|
|
|
- pool_es := make(chan bool, es_pool)
|
|
|
- wg_es := &sync.WaitGroup{}
|
|
|
- pages,numDocs := 0,0
|
|
|
- for {
|
|
|
- searchResult, err := cursor.Next()
|
|
|
- if err != nil {
|
|
|
- if err.Error() == "EOS" {
|
|
|
- break
|
|
|
- }else {
|
|
|
- log.Println("cursor searchResult",err)
|
|
|
- }
|
|
|
- }
|
|
|
- pages++
|
|
|
- isLog := false
|
|
|
- for _, hit := range searchResult.Hits.Hits {
|
|
|
- tmp := make(map[string]interface{})
|
|
|
- err := json.Unmarshal(*hit.Source, &tmp)
|
|
|
- if err != nil {
|
|
|
- log.Println("json Unmarshal error")
|
|
|
- continue
|
|
|
- }
|
|
|
- if !isLog && numDocs%10000==0 {
|
|
|
- log.Println("当前条数:", numDocs, "Es数据:", tmp["_id"])
|
|
|
- isLog = true
|
|
|
- }
|
|
|
-
|
|
|
- numDocs++
|
|
|
- fusion_ids := qu.ObjToString(tmp["allids"])
|
|
|
- sourceid := qu.ObjToString(tmp["_id"])
|
|
|
- pool_es <- true
|
|
|
- wg_es.Add(1)
|
|
|
- go func(sourceid string, fusionArr string) {
|
|
|
- defer func() {
|
|
|
- <-pool_es
|
|
|
- wg_es.Done()
|
|
|
- }()
|
|
|
- AddGroupPool.pool <- map[string]interface{}{
|
|
|
- "_id":StringTOBsonId(sourceid),
|
|
|
- "allids":fusion_ids,
|
|
|
- }
|
|
|
- }(sourceid, fusion_ids)
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- log.Println("遍历Es结束......")
|
|
|
- wg_es.Wait()
|
|
|
-
|
|
|
- log.Println("fusion group over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒")
|
|
|
-}
|
|
|
-
|
|
|
-func startFusionData() {
|
|
|
- log.Println("开始全量融合流程...")
|
|
|
- defer qu.Catch()
|
|
|
- //可以开多程序-不同id段执行融合
|
|
|
- q := map[string]interface{}{}
|
|
|
- sess := mgo.GetMgoConn()
|
|
|
- defer mgo.DestoryMongoConn(sess)
|
|
|
- it := sess.DB(mgo.DbName).C(group_coll_name).Find(&q).Iter()
|
|
|
- index,start :=0, int(time.Now().Unix())
|
|
|
- //多线程保存数据
|
|
|
- pool_mgo := make(chan bool, mgo_pool)
|
|
|
- wg_mgo := &sync.WaitGroup{}
|
|
|
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
|
- if index%10000==0 {
|
|
|
- log.Println("current index",index,tmp["_id"])
|
|
|
- }
|
|
|
- fusion_ids := qu.ObjToString(tmp["allids"])
|
|
|
- fusionArr := strings.Split(fusion_ids, ",")
|
|
|
- sourceid := BsonTOStringId(tmp["_id"])
|
|
|
- pool_mgo <- true
|
|
|
- wg_mgo.Add(1)
|
|
|
- go func(sourceid string, fusionArr []string) {
|
|
|
- defer func() {
|
|
|
- <-pool_mgo
|
|
|
- wg_mgo.Done()
|
|
|
- }()
|
|
|
- weight := NewWeightData(fusionArr)
|
|
|
- weight.analyzeBuildStandardData()
|
|
|
-
|
|
|
- saveFusionData, saveRecordData:= map[string]interface{}{},map[string]interface{}{}
|
|
|
- if len(fusionArr) <= 1 {
|
|
|
- saveFusionData, saveRecordData = weight.dealWithAddFusionStruct()
|
|
|
- }else {
|
|
|
- saveFusionData, saveRecordData = weight.dealWithMultipleAddFusionStruct()
|
|
|
- }
|
|
|
- //新增融合表
|
|
|
- saveid := mgo.Save(fusion_coll_name, saveFusionData)
|
|
|
- saveRecordData["_id"] = saveid
|
|
|
- //批量新增日志表
|
|
|
- AddRecordPool.pool <- saveRecordData
|
|
|
- //批量更新分组表
|
|
|
- UpdateGroupPool.pool <- []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "_id": StringTOBsonId(sourceid),
|
|
|
- },
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "fusion_id": BsonTOStringId(saveid),
|
|
|
- "template_id":qu.ObjToString(saveFusionData["fusion_templateid"]),
|
|
|
- },
|
|
|
- },
|
|
|
- }
|
|
|
- }(sourceid, fusionArr)
|
|
|
-
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- }
|
|
|
- wg_mgo.Wait()
|
|
|
-
|
|
|
- log.Println("fusion is over:",index,"总用时:",int(time.Now().Unix())-start,"秒")
|
|
|
-
|
|
|
-}
|
|
|
-
|
|
|
|
|
|
func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
|
|
|
|
|
|
+ //临时测试
|
|
|
//先到处具体需要融合组数据-存mongo
|
|
|
exportFusionMongoData()
|
|
|
- time.Sleep(30 * time.Second)
|
|
|
+ time.Sleep(60 * time.Second)
|
|
|
|
|
|
//具体融合数据的方法
|
|
|
startFusionData()
|
|
|
- time.Sleep(30 * time.Second)
|
|
|
+ time.Sleep(60 * time.Second)
|
|
|
return
|
|
|
|
|
|
|
|
@@ -276,14 +140,14 @@ func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
|
|
|
|
|
|
}
|
|
|
log.Println("索引准备完毕睡眠30s......耗时:",int(time.Now().Unix())-start,"秒")
|
|
|
- time.Sleep(30 * time.Second)
|
|
|
+ time.Sleep(60 * time.Second)
|
|
|
|
|
|
//先到处具体需要融合组数据-存mongo
|
|
|
exportFusionMongoData()
|
|
|
- time.Sleep(30 * time.Second)
|
|
|
+ time.Sleep(60 * time.Second)
|
|
|
//具体融合数据的方法
|
|
|
startFusionData()
|
|
|
- time.Sleep(30 * time.Second)
|
|
|
+ time.Sleep(60 * time.Second)
|
|
|
|
|
|
taskSendFusionUdp(mapInfo)
|
|
|
}
|
|
@@ -291,3 +155,139 @@ func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
|
|
|
|
|
|
|
|
|
|
|
|
+func exportFusionMongoData() {
|
|
|
+
|
|
|
+ start := int(time.Now().Unix())
|
|
|
+ log.Println("开始导出融合组数据......")
|
|
|
+ //遍历索引
|
|
|
+ esclient := elastic.GetEsConn()
|
|
|
+ defer elastic.DestoryEsConn(esclient)
|
|
|
+
|
|
|
+ if esclient == nil {
|
|
|
+ log.Println("连接池异常")
|
|
|
+ }
|
|
|
+ q :=es_elastic.NewBoolQuery()
|
|
|
+ cursor, err := esclient.Scan(esIndex).Query(es_elastic.NewBoolQuery().Must(q)).
|
|
|
+ Size(200).Do()
|
|
|
+ if err != nil {
|
|
|
+ log.Println("cursor",err)
|
|
|
+ }
|
|
|
+ if cursor.Results == nil {
|
|
|
+ log.Println("results != nil; got nil")
|
|
|
+ }
|
|
|
+ if cursor.Results.Hits == nil {
|
|
|
+ log.Println("expected results.Hits != nil; got nil")
|
|
|
+ }
|
|
|
+ log.Println("查询正常-总数:",cursor.TotalHits())
|
|
|
+ //多线程 - 处理数据
|
|
|
+ pool_es := make(chan bool, es_pool)
|
|
|
+ wg_es := &sync.WaitGroup{}
|
|
|
+ pages,numDocs := 0,0
|
|
|
+ for {
|
|
|
+ searchResult, err := cursor.Next()
|
|
|
+ if err != nil {
|
|
|
+ if err.Error() == "EOS" {
|
|
|
+ break
|
|
|
+ }else {
|
|
|
+ log.Println("cursor searchResult",err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ pages++
|
|
|
+ isLog := false
|
|
|
+ for _, hit := range searchResult.Hits.Hits {
|
|
|
+ tmp := make(map[string]interface{})
|
|
|
+ err := json.Unmarshal(*hit.Source, &tmp)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("json Unmarshal error")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if !isLog && numDocs%10000==0 {
|
|
|
+ log.Println("当前条数:", numDocs, "Es数据:", tmp["_id"])
|
|
|
+ isLog = true
|
|
|
+ }
|
|
|
+
|
|
|
+ numDocs++
|
|
|
+ fusion_ids := qu.ObjToString(tmp["allids"])
|
|
|
+ sourceid := qu.ObjToString(tmp["_id"])
|
|
|
+ pool_es <- true
|
|
|
+ wg_es.Add(1)
|
|
|
+ go func(sourceid string, fusionArr string) {
|
|
|
+ defer func() {
|
|
|
+ <-pool_es
|
|
|
+ wg_es.Done()
|
|
|
+ }()
|
|
|
+ AddGroupPool.pool <- map[string]interface{}{
|
|
|
+ "_id":StringTOBsonId(sourceid),
|
|
|
+ "allids":fusion_ids,
|
|
|
+ }
|
|
|
+ }(sourceid, fusion_ids)
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ log.Println("遍历Es结束......")
|
|
|
+ wg_es.Wait()
|
|
|
+
|
|
|
+ log.Println("fusion group over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒")
|
|
|
+}
|
|
|
+
|
|
|
+func startFusionData() {
|
|
|
+ log.Println("开始全量融合流程...")
|
|
|
+ defer qu.Catch()
|
|
|
+ //可以开多程序-不同id段执行融合
|
|
|
+ q := map[string]interface{}{}
|
|
|
+ sess := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+ it := sess.DB(mgo.DbName).C(group_coll_name).Find(&q).Iter()
|
|
|
+ index,start :=0, int(time.Now().Unix())
|
|
|
+ //多线程保存数据
|
|
|
+ pool_mgo := make(chan bool, mgo_pool)
|
|
|
+ wg_mgo := &sync.WaitGroup{}
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
|
+ if index%10000==0 {
|
|
|
+ log.Println("current index",index,tmp["_id"])
|
|
|
+ }
|
|
|
+ fusion_ids := qu.ObjToString(tmp["allids"])
|
|
|
+ fusionArr := strings.Split(fusion_ids, ",")
|
|
|
+ sourceid := BsonTOStringId(tmp["_id"])
|
|
|
+ pool_mgo <- true
|
|
|
+ wg_mgo.Add(1)
|
|
|
+ go func(sourceid string, fusionArr []string) {
|
|
|
+ defer func() {
|
|
|
+ <-pool_mgo
|
|
|
+ wg_mgo.Done()
|
|
|
+ }()
|
|
|
+ weight := NewWeightData(fusionArr)
|
|
|
+ weight.analyzeBuildStandardData()
|
|
|
+
|
|
|
+ saveFusionData, saveRecordData:= map[string]interface{}{},map[string]interface{}{}
|
|
|
+ if len(fusionArr) <= 1 {
|
|
|
+ saveFusionData, saveRecordData = weight.dealWithAddFusionStruct()
|
|
|
+ }else {
|
|
|
+ saveFusionData, saveRecordData = weight.dealWithMultipleAddFusionStruct()
|
|
|
+ }
|
|
|
+ //新增融合表
|
|
|
+ saveid := mgo.Save(fusion_coll_name, saveFusionData)
|
|
|
+ saveRecordData["_id"] = saveid
|
|
|
+ //批量新增日志表
|
|
|
+ AddRecordPool.pool <- saveRecordData
|
|
|
+ //批量更新分组表
|
|
|
+ UpdateGroupPool.pool <- []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": StringTOBsonId(sourceid),
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "fusion_id": BsonTOStringId(saveid),
|
|
|
+ "template_id":qu.ObjToString(saveFusionData["fusion_templateid"]),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }(sourceid, fusionArr)
|
|
|
+
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ wg_mgo.Wait()
|
|
|
+
|
|
|
+ log.Println("fusion is over:",index,"总用时:",int(time.Now().Unix())-start,"秒")
|
|
|
+
|
|
|
+}
|