apple 4 anos atrás
pai
commit
725e05b7ed
1 arquivos alterados com 20 adições e 25 exclusões
  1. 20 25
      udpfusion/src/fusionFullData.go

+ 20 - 25
udpfusion/src/fusionFullData.go

@@ -25,7 +25,7 @@ func startFusionData()  {
 	cursor, err := esclient.Scan(esIndex).Query(es_elastic.NewBoolQuery().Must(q)).
 		Size(200).Do()
 	if err != nil {
-		log.Fatal(err)
+		log.Fatal("cursor",err)
 	}
 	if cursor.Results == nil {
 		log.Fatalf("results != nil; got nil")
@@ -33,7 +33,9 @@ func startFusionData()  {
 	if cursor.Results.Hits == nil {
 		log.Fatalf("expected results.Hits != nil; got nil")
 	}
-	log.Println("查询总数:",cursor.TotalHits())
+
+
+	log.Println("查询正常-总数:",cursor.TotalHits())
 
 
 	//多线程 - 处理数据
@@ -43,23 +45,21 @@ func startFusionData()  {
 
 	for {
 		searchResult, err := cursor.Next()
-		if err == es_elastic.EOS {
-			break
-		}
 		if err != nil {
-			log.Fatal(err)
+			if err.Error() == "EOS" {
+				break
+			}else {
+				log.Fatal("cursor searchResult",err)
+			}
 		}
-
 		pages++
 		isLog := false
 		for _, hit := range searchResult.Hits.Hits {
-			if hit.Index != esIndex {
-				log.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", esIndex, hit.Index)
-			}
 			tmp := make(map[string]interface{})
 			err := json.Unmarshal(*hit.Source, &tmp)
 			if err != nil {
-				log.Fatalln(err)
+				log.Println("json Unmarshal error")
+				continue
 			}
 			if !isLog && numDocs%10000==0 {
 				log.Println("当前条数:", numDocs, "Es数据:", tmp["_id"])
@@ -85,24 +85,24 @@ func startFusionData()  {
 					saveid := mgo.Save(fusion_coll_name, saveFusionData)
 					//新增-Record
 					saveRecordData["_id"] = saveid
-					//UpdateRecord.add_pool <- saveRecordData
 					mgo.Save(record_coll_name,saveRecordData)
+					//UpdateRecord.add_pool <- saveRecordData  //批量新增
 
 					//批量更新Es
-					fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
-					updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
-					updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
-					UpdateElastic.update_pool <- map[string]string{
-						"id":sourceid,
-						"updateStr":updateStr1+updateStr2,
-					}
+					//fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
+					//updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
+					//updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
+					//UpdateElastic.update_pool <- map[string]string{
+					//	"id":sourceid,
+					//	"updateStr":updateStr1+updateStr2,
+					//}
 				}else { //多组数据
 					saveFusionData, saveRecordData := weight.dealWithMultipleAddFusionStruct()
 					saveid := mgo.Save(fusion_coll_name, saveFusionData)
 					//新增-Record
 					saveRecordData["_id"] = saveid
 					mgo.Save(record_coll_name,saveRecordData)
-					//UpdateRecord.add_pool <- saveRecordData
+					//UpdateRecord.add_pool <- saveRecordData //批量新增
 					//批量更新Es
 					//fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
 					//updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
@@ -120,11 +120,6 @@ func startFusionData()  {
 	log.Println("遍历Es结束......")
 	wg_mgo.Wait()
 
-	if pages <= 0 {
-		log.Fatalf("expected to retrieve at least 1 page; got %d", pages)
-	}
-
-
 	log.Println("fusion is over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒")
 
 }