Browse Source

备份-融合

apple 4 years ago
parent
commit
c50322efac
2 changed files with 18 additions and 10 deletions
  1. 5 0
      udpfilterdup/src/main.go
  2. 13 10
      udpfusion/src/fusionFullData.go

+ 5 - 0
udpfilterdup/src/main.go

@@ -60,6 +60,7 @@ var (
 
 func init() {
 
+	return
 	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
 	flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")	//历史
 	flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")//全量区间pt
@@ -176,6 +177,10 @@ func mainT() {
 func main() {
 
 
+	exportFusionRecordExcleData()
+
+	return
+
 	if TimingTask {
 		go historyTaskDay()
 		time.Sleep(99999 * time.Hour)

+ 13 - 10
udpfusion/src/fusionFullData.go

@@ -85,7 +85,9 @@ func startFusionData()  {
 					saveid := mgo.Save(fusion_coll_name, saveFusionData)
 					//新增-Record
 					saveRecordData["_id"] = saveid
-					UpdateRecord.add_pool <- saveRecordData
+					//UpdateRecord.add_pool <- saveRecordData
+					mgo.Save(record_coll_name,saveRecordData)
+
 					//批量更新Es
 					fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
 					updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
@@ -99,22 +101,23 @@ func startFusionData()  {
 					saveid := mgo.Save(fusion_coll_name, saveFusionData)
 					//新增-Record
 					saveRecordData["_id"] = saveid
-					UpdateRecord.add_pool <- saveRecordData
+					mgo.Save(record_coll_name,saveRecordData)
+					//UpdateRecord.add_pool <- saveRecordData
 					//批量更新Es
-					fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
-					updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
-					updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
-					UpdateElastic.update_pool <- map[string]string{
-						"id":sourceid,
-						"updateStr":updateStr1+updateStr2,
-					}
+					//fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
+					//updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
+					//updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
+					//UpdateElastic.update_pool <- map[string]string{
+					//	"id":sourceid,
+					//	"updateStr":updateStr1+updateStr2,
+					//}
 
 				}
 			}(sourceid, fusionArr)
 		}
 
 	}
-
+	log.Println("遍历Es结束......")
 	wg_mgo.Wait()
 
 	if pages <= 0 {