Explorar o código

错误数据批量保存

maxiaoshan %!s(int64=4) %!d(string=hai) anos
pai
achega
3f90ffe0b1
Modificáronse 3 ficheiros con 46 adicións e 1 borrados
  1. 2 0
      src/main.go
  2. 2 0
      src/spider/spider.go
  3. 42 1
      src/spider/store.go

+ 2 - 0
src/main.go

@@ -111,6 +111,8 @@ func main() {
 	go spider.DetailData()
 	//定时任务(现在此任务由编辑器建任务时完成)
 	//go spider.TimeTask()
+	//批量保存错误数据
+	go spider.UpdataErrDataMgo()
 	logger.Debug(Config.Webport)
 	xweb.Run(":" + Config.Webport)
 }

+ 2 - 0
src/spider/spider.go

@@ -65,6 +65,8 @@ type Spider struct {
 	IsMustDownload   bool //是否强制下载
 }
 
+var UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态
+var SP = make(chan bool, 5)
 var Mgo *mgo.MongodbSim
 var TimeChan = make(chan bool, 1)
 var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)

+ 42 - 1
src/spider/store.go

@@ -230,7 +230,11 @@ func SaveErrorData(modifyuser string, pd map[string]interface{}, err interface{}
 		set := map[string]interface{}{
 			"$set": pd,
 		}
-		Mgo.Update("regatherdata", "spider", "spider", query, set, true, false)
+		update := []map[string]interface{}{}
+		update = append(update, query)
+		update = append(update, set)
+		UpdataMgoCache <- update
+		//Mgo.Update("regatherdata", "spider", "spider", query, set, true, false)
 	}
 }
 
@@ -430,3 +434,40 @@ func UpdateHighListDataByCode(code string) {
 	}
 	Mgo.Update("spider_highlistdata", query, set, false, true)
 }
+
+//批量更新错误数据
+func UpdataErrDataMgo() {
+	fmt.Println("Save...")
+	arru := make([][]map[string]interface{}, 50)
+	indexu := 0
+	for {
+		select {
+		case v := <-UpdataMgoCache:
+			arru[indexu] = v
+			indexu++
+			if indexu == 50 {
+				SP <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-SP
+					}()
+					Mgo.UpSertBulk("regatherdata", arru...)
+				}(arru)
+				arru = make([][]map[string]interface{}, 50)
+				indexu = 0
+			}
+		case <-time.After(5 * time.Second):
+			if indexu > 0 {
+				SP <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-SP
+					}()
+					Mgo.UpSertBulk("regatherdata", arru...)
+				}(arru[:indexu])
+				arru = make([][]map[string]interface{}, 200)
+				indexu = 0
+			}
+		}
+	}
+}