Bladeren bron

udp抽取

zhangjinkun 6 jaren geleden
bovenliggende
commit
00c4b78ab2
2 gewijzigde bestanden met toevoegingen van 59 en 49 verwijderingen
  1. 5 1
      src/jy/extract/extract.go
  2. 54 48
      src/jy/extract/extractudp.go

+ 5 - 1
src/jy/extract/extract.go

@@ -209,7 +209,11 @@ func PreInfo(doc map[string]interface{}) *ju.Job {
 		Result:     map[string][]*ju.ExtField{},
 		BuyerAddr:  qu.ObjToString(doc["buyeraddr"]),
 	}
-	pretreated.AnalyStart(j)
+	qu.Try(func() {
+		pretreated.AnalyStart(j)
+	}, func(err interface{}) {
+		log.Println("pretreated.AnalyStart", err)
+	})
 	return j
 }
 

+ 54 - 48
src/jy/extract/extractudp.go

@@ -106,20 +106,20 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 	go ext.BidSave()
 	ext.IsRun = true
 
-	query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
-	count1 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
-	count2 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", query)
-	count := count1 + count2
-	pageNum := (count + PageSize - 1) / PageSize
-	limit := PageSize
-	if count < PageSize {
-		limit = count
-	}
-	log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
-	sidback := sid
-	//接着上次任务执行
-	startI := 0
-	if len(instanceId) > 0 {
+	if len(instanceId) > 0 { //分布式抽取进度
+		query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
+		count1 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
+		count2 := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", query)
+		count := count1 + count2
+		pageNum := (count + PageSize - 1) / PageSize
+		limit := PageSize
+		if count < PageSize {
+			limit = count
+		}
+		log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
+
+		startI := 0 //接着上次任务执行
+		sidback := sid
 		esc, _ := db.Mgo.FindOne("ecs", `{"InstanceId":"`+instanceId[0]+`"}`)
 		startI = qu.IntAll((*esc)["pagecurrent"])
 		if qu.ObjToString((*esc)["lastId"]) != "" {
@@ -128,48 +128,54 @@ func ExtractByUdp(sid, eid string, instanceId ...string) {
 		if qu.ObjToString((*esc)["lastIdback"]) != "" {
 			sidback = qu.ObjToString((*esc)["lastIdback"])
 		}
-	}
 
-	for i := startI; i < pageNum; i++ {
-		query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
-		log.Printf("page=%d,query=%v", i+1, query)
-		if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query) > 0 {
-			list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
-			for _, v := range *list {
-				//log.Println(v["_id"])
-				j := PreInfo(v)
-				ext.TaskInfo.ProcessPool <- true
-				go ext.ExtractProcess(j)
-				sid = qu.BsonIdToSId(v["_id"])
+		for i := startI; i < pageNum; i++ {
+			query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
+			log.Printf("page=%d,query=%v", i+1, query)
+			if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query) > 0 {
+				list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
+				for _, v := range *list {
+					//log.Println(v["_id"])
+					j := PreInfo(v)
+					ext.TaskInfo.ProcessPool <- true
+					go ext.ExtractProcess(j)
+					sid = qu.BsonIdToSId(v["_id"])
+				}
+				db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
+					map[string]interface{}{"$set": map[string]interface{}{
+						"lastId": sid,
+					}}, true, false)
 			}
-			db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
-				map[string]interface{}{"$set": map[string]interface{}{
-					"lastId": sid,
-				}}, true, false)
-		}
-		queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
-		log.Printf("page=%d,queryback=%v", i+1, queryback)
-		if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
-			list2, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit)
-			for _, v := range *list2 {
-				//log.Println(v["_id"])
-				j := PreInfo(v)
-				ext.TaskInfo.ProcessPool <- true
-				go ext.ExtractProcess(j)
-				sidback = qu.BsonIdToSId(v["_id"])
+			queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
+			log.Printf("page=%d,queryback=%v", i+1, queryback)
+			if ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
+				list2, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit)
+				for _, v := range *list2 {
+					//log.Println(v["_id"])
+					j := PreInfo(v)
+					ext.TaskInfo.ProcessPool <- true
+					go ext.ExtractProcess(j)
+					sidback = qu.BsonIdToSId(v["_id"])
+				}
+				db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
+					map[string]interface{}{"$set": map[string]interface{}{
+						"lastIdback": sidback,
+					}}, true, false)
 			}
-			db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
-				map[string]interface{}{"$set": map[string]interface{}{
-					"lastIdback": sidback,
-				}}, true, false)
-		}
-		//分布式抽取进度
-		if len(instanceId) > 0 {
 			db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
 				map[string]interface{}{"$set": map[string]interface{}{
 					"pagetotal":   pageNum,
 					"pagecurrent": i + 1,
 				}}, true, false)
 		}
+	} else { //普通抽取
+		query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
+		list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
+		for _, v := range *list {
+			//log.Println(v["_id"])
+			j := PreInfo(v)
+			ext.TaskInfo.ProcessPool <- true
+			go ext.ExtractProcess(j)
+		}
 	}
 }