maxiaoshan 4 жил өмнө
parent
commit
0a1f0b5427

+ 2 - 0
src/main.go

@@ -68,6 +68,8 @@ func main() {
 	go spider.DetailData()
 	//定时任务(现在此任务由编辑器建任务时完成)
 	//go spider.TimeTask()
+	//批量保存错误数据
+	go spider.UpdataErrDataMgo()
 	logger.Debug(Config.Webport)
 	xweb.Run(":" + Config.Webport)
 }

+ 33 - 10
src/spider/msgservice.go

@@ -275,20 +275,43 @@ func SaveObj(event int, checkAtrr string, data map[string]interface{}, saveredis
 		} else {
 			data["sendflag"] = "true"
 		}
-		id := MgoS.Save("data_bak", data)
-		if !flag && id != "" {
-			href := fmt.Sprint(data["href"])
-			if len(href) > 5 && saveredis { //有效数据
-				db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
-				//增量
+		href := fmt.Sprint(data["href"])
+		if len(href) > 5 && saveredis { //有效数据
+			db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
+			//增量
+			isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
+			id := ""
+			if !isExist {
+				id = mgu.Save("data_bak", "spider", "spider", data)
+			} else { //记录重复数据,spider_repeatdata
+				mgu.Save("spider_repeatdata", "spider", "spider", data)
+			}
+			//保存服务未接收成功的数据会存入data_bak中,确保数据不丢失依赖补发程序
+			if id != "" {
 				util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
-				//全量(判断是否已存在防止覆盖id)
-				isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, "url_repeat_"+href)
-				if !isExist {
-					util.PutRedis("title_repeat_fulljudgement", db, "url_repeat_"+href, "", -1)
+				if !flag { //保存服务发送成功
+					//全量(判断是否已存在防止覆盖id)
+					isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, "url_repeat_"+href)
+					if !isExist {
+						util.PutRedis("title_repeat_fulljudgement", db, "url_repeat_"+href, "", -1)
+					}
 				}
 			}
 		}
+		// id := MgoS.Save("data_bak", data)
+		// if !flag && id != "" {
+		// 	href := fmt.Sprint(data["href"])
+		// 	if len(href) > 5 && saveredis { //有效数据
+		// 		db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
+		// 		//增量
+		// 		util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
+		// 		//全量(判断是否已存在防止覆盖id)
+		// 		isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, "url_repeat_"+href)
+		// 		if !isExist {
+		// 			util.PutRedis("title_repeat_fulljudgement", db, "url_repeat_"+href, "", -1)
+		// 		}
+		// 	}
+		// }
 	}
 }
 

+ 6 - 4
src/spider/spider.go

@@ -59,6 +59,8 @@ type Spider struct {
 	IsMustDownload   bool //是否强制下载
 }
 
+var UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态
+var SP = make(chan bool, 5)
 var TimeChan = make(chan bool, 1)
 var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
 
@@ -167,14 +169,14 @@ func DownloadHighDetail(code string) {
 							DownloadErrorData(s.Code, tmp)
 						}*/
 					} else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
-						log.Println("beforeHref:", href, "afterHref:", tmphref)
+						log.Println("beforeHref:", href, "afterHref:", href)
 						//增量
-						util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+tmphref, tmphref, 3600*24*30)
+						util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
 						//全量
 						db := HexToBigIntMod(tmphref)
-						isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, "url_repeat_"+tmphref)
+						isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, "url_repeat_"+href)
 						if !isExist {
-							util.PutRedis("title_repeat_fulljudgement", db, "url_repeat_"+tmphref, "", -1)
+							util.PutRedis("title_repeat_fulljudgement", db, "url_repeat_"+href, "", -1)
 						}
 					}
 					if !success { //下载失败更新次数和状态

+ 42 - 1
src/spider/store.go

@@ -224,7 +224,11 @@ func SaveErrorData(modifyuser string, pd map[string]interface{}, err interface{}
 		set := map[string]interface{}{
 			"$set": pd,
 		}
-		MgoS.Update("regatherdata", query, set, true, false)
+		update := []map[string]interface{}{}
+		update = append(update, query)
+		update = append(update, set)
+		UpdataMgoCache <- update
+		//MgoS.Update("regatherdata", query, set, true, false)
 	}
 }
 
@@ -243,3 +247,40 @@ func GetTime(day int) int64 {
 	t, _ := time.ParseInLocation(util.Date_Short_Layout, timeStr, time.Local)
 	return t.Unix()
 }
+
+//批量更新错误数据
+func UpdataErrDataMgo() {
+	fmt.Println("Save...")
+	arru := make([][]map[string]interface{}, 50)
+	indexu := 0
+	for {
+		select {
+		case v := <-UpdataMgoCache:
+			arru[indexu] = v
+			indexu++
+			if indexu == 50 {
+				SP <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-SP
+					}()
+					Mgo.UpSertBulk("regatherdata", arru...)
+				}(arru)
+				arru = make([][]map[string]interface{}, 50)
+				indexu = 0
+			}
+		case <-time.After(5 * time.Second):
+			if indexu > 0 {
+				SP <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-SP
+					}()
+					Mgo.UpSertBulk("regatherdata", arru...)
+				}(arru[:indexu])
+				arru = make([][]map[string]interface{}, 200)
+				indexu = 0
+			}
+		}
+	}
+}