浏览代码

爬虫采集非本站点数据

maxiaoshan 3 年之前
父节点
当前提交
6c132d1892
共有 3 个文件被更改,包括 63 次插入3 次删除
  1. 2 0
      src/main.go
  2. 22 1
      src/spider/spider.go
  3. 39 2
      src/spider/store.go

+ 2 - 0
src/main.go

@@ -120,6 +120,8 @@ func main() {
 	go spider.DetailData()
 	//批量保存错误数据
 	go spider.UpdateErrDataMgo()
+	//保存爬虫采集非本站点数据
+	go spider.SaveOtherSiteData()
 	//批量保存心跳信息
 	go spider.UpdateHeartInfo()
 	logger.Debug(Config.Webport)

+ 22 - 1
src/spider/spider.go

@@ -82,11 +82,14 @@ var EsIndex string
 var EsType string
 var UpdataMgoCache = make(chan []map[string]interface{}, 1000)   //更新要重下数据的状态
 var UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息
+var SaveMgoCache = make(chan map[string]interface{}, 1000)       //保存爬虫采集非本站点数据
 var SP = make(chan bool, 5)
 var SPH = make(chan bool, 5)
+var SPS = make(chan bool, 5)
 var Mgo *mgo.MongodbSim
 var TimeChan = make(chan bool, 1)
 var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
+var DomainNameReg = regexp.MustCompile(`(?://).+?(?:)[::/]`)
 
 //心跳
 func UpdateHeart(site, channel, code, user, t string) {
@@ -286,7 +289,7 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
 				for i := 1; i <= tabLen; i++ {
 					v := tbl.RawGetInt(i).(*lua.LTable)
 					tmp := util.TableToMap(v)
-					//新增历史补漏
+					s.ThisSiteData(tmp)      //统计当前下载数据是否是本站点数据
 					if !s.IsHistoricalMend { //不是历史补漏
 						tmp["dataging"] = 0 //数据中打标记dataging=0
 						if s.DownDetail {
@@ -415,6 +418,24 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
 	return errs
 }
 
+func (s *Spider) ThisSiteData(tmp map[string]interface{}) {
+	defer qu.Catch()
+	href := qu.ObjToString(tmp["href"])
+	url_dn := DomainNameReg.FindString(s.TargetChannelUrl)
+	href_dn := DomainNameReg.FindString(href)
+	if url_dn != href_dn {
+		SaveMgoCache <- map[string]interface{}{
+			"site":       s.Name,
+			"channel":    s.Channel,
+			"spidercode": s.Code,
+			"url":        s.TargetChannelUrl,
+			"href":       href,
+			"modifyuser": s.MUserName,
+			"comeintime": time.Now().Unix(),
+		}
+	}
+}
+
 //遍历,开启三级页下载(历史补漏)
 func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
 	//qu.Debug("--------------历史下载-----------------")

+ 39 - 2
src/spider/store.go

@@ -313,7 +313,7 @@ func UpdateErrDataMgo() {
 				arru = make([][]map[string]interface{}, 50)
 				indexu = 0
 			}
-		case <-time.After(5 * time.Second):
+		case <-time.After(1 * time.Minute):
 			if indexu > 0 {
 				SP <- true
 				go func(arru [][]map[string]interface{}) {
@@ -350,7 +350,7 @@ func UpdateHeartInfo() {
 				heartarr = make([][]map[string]interface{}, 200)
 				indexh = 0
 			}
-		case <-time.After(10 * time.Second):
+		case <-time.After(1 * time.Minute):
 			if indexh > 0 {
 				SPH <- true
 				go func(heartarr [][]map[string]interface{}) {
@@ -365,3 +365,40 @@ func UpdateHeartInfo() {
 		}
 	}
 }
+
+//保存爬虫采集非本站点数据
+func SaveOtherSiteData() {
+	fmt.Println("Save Other Site Data...")
+	savearr := make([]map[string]interface{}, 200)
+	indexh := 0
+	for {
+		select {
+		case v := <-SaveMgoCache:
+			savearr[indexh] = v
+			indexh++
+			if indexh == 200 {
+				SPS <- true
+				go func(savearr []map[string]interface{}) {
+					defer func() {
+						<-SPS
+					}()
+					Mgo.SaveBulk("spider_othersite", savearr...)
+				}(savearr)
+				savearr = make([]map[string]interface{}, 200)
+				indexh = 0
+			}
+		case <-time.After(1 * time.Minute):
+			if indexh > 0 {
+				SPS <- true
+				go func(savearr []map[string]interface{}) {
+					defer func() {
+						<-SPS
+					}()
+					Mgo.SaveBulk("spider_othersite", savearr...)
+				}(savearr[:indexh])
+				savearr = make([]map[string]interface{}, 200)
+				indexh = 0
+			}
+		}
+	}
+}