Browse Source

爬虫补采模块修改

mxs 1 year ago
parent
commit
d889b53aa1
4 changed files with 208 additions and 53 deletions
  1. 1 1
      src/config.json
  2. 67 44
      src/spider/spider.go
  3. 3 1
      src/spider/store.go
  4. 137 7
      src/spider/supplement.go

+ 1 - 1
src/config.json

@@ -18,7 +18,7 @@
     "working": 0,
     "working": 0,
     "chansize": 4,
     "chansize": 4,
     "detailchansize": 20,
     "detailchansize": 20,
-    "uploadevent": 7100,
+    "uploadevent": 7001,
     "logLevel": 1,
     "logLevel": 1,
     "daynum": 6,
     "daynum": 6,
     "modal": 1,
     "modal": 1,

+ 67 - 44
src/spider/spider.go

@@ -940,19 +940,27 @@ func (s *Spider) DownListPageItemByThreadsBack() (errs interface{}) {
 func (s *Spider) SupplementDownListPageItem() (errs interface{}) {
 func (s *Spider) SupplementDownListPageItem() (errs interface{}) {
 	defer qu.Catch()
 	defer qu.Catch()
 	var (
 	var (
-		errtimes       int      //采集异常次数(暂定10次)
-		errPageNum     int      //当前采集异常页码
-		downtimes      int      //记录某页重试次数(暂定3次)
-		downloadAllNum int      //记录本次采集,信息采集总量
-		saveAllNum     int      //记录本次采集,信息补采总量
-		repeatAllNum   int      //记录本次采集,信息重复总量
-		pageTitleHash  string   //记录当前页所有title文本
-		finishText     = "正常退出" //
-		start          = 1      //起始页
+		errtimes                int    //采集异常次数(暂定10次)
+		errPageNum              int    //当前采集异常页码
+		downtimes               int    //记录某页重试次数(暂定3次)
+		downloadAllNum          int    //记录本次采集,信息采集总量
+		saveAllNum              int    //记录本次采集,信息补采总量
+		repeatAllNum            int    //记录本次采集,信息重复总量
+		pageTitleHash           string //记录当前页所有title文本
+		finish                  int    //完成状态 0:未完成 1:完成 -1:异常退出
+		start                   = 1    //起始页
+		publishtimeAllZeroTimes int    //发布时间为0次数
 	)
 	)
+	Supplement_SaveData[s.Code] = &SupplementSpider{
+		Site:       s.Name,
+		Channel:    s.Channel,
+		Spidercode: s.Code,
+		Modifyuser: s.MUserName,
+		Finish:     finish,
+	}
 	for {
 	for {
-		if errtimes >= Supplement_MaxErrorTimes { //连续异常次数超过10次,爬虫不再翻页
-			finishText = "异常退出"
+		if errtimes >= Supplement_MaxErrorTimes || publishtimeAllZeroTimes > Supplement_Publishtime_ZeroTimes { //连续异常次数超过10次,爬虫不再翻页
+			finish = -1
 			logger.Info(s.Code + "连续10页采集异常")
 			logger.Info(s.Code + "连续10页采集异常")
 			break
 			break
 		}
 		}
@@ -979,11 +987,12 @@ func (s *Spider) SupplementDownListPageItem() (errs interface{}) {
 		if tbl, ok := lv.(*lua.LTable); ok {
 		if tbl, ok := lv.(*lua.LTable); ok {
 			if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页
 			if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页
 				var (
 				var (
-					publishtimeErrTimes int
-					text                string
-					repeatListNum       int // 当前列表页连接重复个数
-					num                 = 1
-					isBreak             = false
+					publishtimeErrTimes  int
+					publishtimeZeroTimes int
+					text                 string
+					repeatListNum        int // 当前列表页连接重复个数
+					num                  = 1
+					isBreak              = false
 				)
 				)
 				for ; num <= tabLen; num++ {
 				for ; num <= tabLen; num++ {
 					v := tbl.RawGetInt(num).(*lua.LTable)
 					v := tbl.RawGetInt(num).(*lua.LTable)
@@ -998,26 +1007,36 @@ func (s *Spider) SupplementDownListPageItem() (errs interface{}) {
 					if publishtime > 1000000000 && publishtime < Supplement_Publishtime { //正常退出
 					if publishtime > 1000000000 && publishtime < Supplement_Publishtime { //正常退出
 						isBreak = true
 						isBreak = true
 						//break
 						//break
-					} else if publishtime <= 1000000000 { //异常发布时间
+					} else if (publishtime > 0 && publishtime <= 1000000000) || publishtime > time.Now().Unix() { //异常发布时间
 						publishtimeErrTimes++
 						publishtimeErrTimes++
+					} else if publishtime <= 0 {
+						publishtimeZeroTimes++
 					}
 					}
 				}
 				}
 				logger.Info(s.Code, start, tabLen, repeatListNum)
 				logger.Info(s.Code, start, tabLen, repeatListNum)
-				downloadAllNum += tabLen                                                //采集总量累计
-				repeatAllNum += repeatListNum                                           //重复总量累计
-				saveAllNum += num - 1 - repeatListNum                                   //保存总量累计
-				tmpPageTitleHash := pageTitleHash                                       //
-				pageTitleHash = util.HexText(text)                                      //
-				if tabLen == publishtimeErrTimes || tmpPageTitleHash == pageTitleHash { //当前页数据发布时间均异常;当前页与上页采集内容一致
-					//if errtimes == 0 || start == errPageNum+1  {
-					errtimes++
-					errPageNum = start
+				downloadAllNum += tabLen              //采集总量累计
+				repeatAllNum += repeatListNum         //重复总量累计
+				saveAllNum += num - 1 - repeatListNum //保存总量累计
+				tmpPageTitleHash := pageTitleHash     //
+				pageTitleHash = util.HexText(text)    //
+				if tabLen == publishtimeZeroTimes {   //列表页全部数据无发布时间
+					publishtimeAllZeroTimes++
 					start++
 					start++
-					//}
 					continue
 					continue
-				} else if isBreak { //中断不再采集
-					start++
-					break
+				} else {
+					publishtimeAllZeroTimes = 0
+					if tabLen == publishtimeErrTimes || tmpPageTitleHash == pageTitleHash { //当前页数据发布时间均异常;当前页与上页采集内容一致
+						//if errtimes == 0 || start == errPageNum+1  {
+						errtimes++
+						errPageNum = start
+						start++
+						//}
+						continue
+					} else if isBreak { //中断不再采集
+						finish = 1
+						start++
+						break
+					}
 				}
 				}
 			} else {
 			} else {
 				if downtimes < 3 {
 				if downtimes < 3 {
@@ -1047,20 +1066,15 @@ func (s *Spider) SupplementDownListPageItem() (errs interface{}) {
 		errPageNum = 0
 		errPageNum = 0
 		util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
 		util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
 	}
 	}
-	logger.Info(s.Code, "本轮列表页采集详情:", downloadAllNum, repeatAllNum, saveAllNum, finishText)
-	save := map[string]interface{}{
-		"site":       s.Name,
-		"channel":    s.Channel,
-		"spidercode": s.Code,
-		"comeintime": time.Now().Unix(),
-		"modifyuser": s.MUserName,
-		"endpage":    start,
-		"finish":     finishText,
-		"savenum":    saveAllNum,
-		"count":      downloadAllNum,
-		"repeat":     repeatAllNum,
-	}
-	MgoS.Save("spider_supplement", save)
+	logger.Info(s.Code, "本轮列表页采集详情:", downloadAllNum, repeatAllNum, saveAllNum, finish)
+	//补充采集信息
+	ss := Supplement_SaveData[s.Code]
+	ss.Finish = finish
+	//ss.SaveNum = saveAllNum
+	ss.EndPage = start
+	ss.DownNum = downloadAllNum
+	ss.RepeatNum = repeatAllNum
+	ss.Comeintime = time.Now().Unix()
 	return errs
 	return errs
 }
 }
 
 
@@ -1710,6 +1724,15 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
 					spLock.Lock()
 					spLock.Lock()
 					updateArr = append(updateArr, update)
 					updateArr = append(updateArr, update)
 					spLock.Unlock()
 					spLock.Unlock()
+					/*
+						历史数据分两种:1、7000节点补采的历史数据;2、补采程序(每日补采)补采的增量数据;
+						第一种历史数据在采集的过程中将href已经存入增量列表页redis,而第二种历史数据是不存redis的;
+						故,第二种历史数据在详情页采集成功后要存入增量列表页redis
+					*/
+					if isHistory {
+						hashHref := util.HexText(href)
+						util.RedisSet("list", "list_"+hashHref, "", 86400*365*2)
+					}
 					//到此数据下载完成
 					//到此数据下载完成
 				}(tmp, spTmp)
 				}(tmp, spTmp)
 			}
 			}

+ 3 - 1
src/spider/store.go

@@ -198,7 +198,9 @@ func SaveHighListPageData(tmp map[string]interface{}, hashHref string, num *int)
 	} else {
 	} else {
 		MgoS.Save("spider_highlistdata", tmp)
 		MgoS.Save("spider_highlistdata", tmp)
 	}
 	}
-	lu.RedisSet("list", "list_"+hashHref, "", 86400*365*2)
+	if !Supplement {
+		lu.RedisSet("list", "list_"+hashHref, "", 86400*365*2)
+	}
 }
 }
 
 
 // 保存7410、7500、7510、7520、7700采集的列表页信息
 // 保存7410、7500、7510、7520、7700采集的列表页信息

+ 137 - 7
src/spider/supplement.go

@@ -4,7 +4,11 @@ import (
 	"flag"
 	"flag"
 	"github.com/cron"
 	"github.com/cron"
 	"github.com/donnie4w/go-logger/logger"
 	"github.com/donnie4w/go-logger/logger"
+	"gopkg.in/mgo.v2/bson"
 	"os"
 	"os"
+	qu "qfw/util"
+	"sync"
+	"time"
 )
 )
 
 
 /*
 /*
@@ -12,15 +16,33 @@ import (
 */
 */
 
 
 var (
 var (
-	Supplement               bool   //是否为定时重采
-	Supplement_Cycle         string //运行周期(day:每天定点执行;week:每周定点执行)
-	Supplement_Day           int    //补采多少天的数据
-	Supplement_Publishtime   int64  //补采数据最小的发布时间
-	Supplement_StartCron     string //开始
-	Supplement_EndCron       string //关闭
-	Supplement_MaxErrorTimes int    //连续异常次数,中断采集
+	Supplement                       bool   //是否为定时重采
+	Supplement_Cycle                 string //运行周期(day:每天定点执行;week:每周定点执行)
+	Supplement_Day                   int    //补采多少天的数据
+	Supplement_Publishtime           int64  //补采数据最小的发布时间
+	Supplement_Publishtime_ZeroTimes = 100  //列表页无发布时间采集退出次数
+	Supplement_StartCron             string //开始
+	Supplement_EndCron               string //关闭
+	Supplement_MaxErrorTimes         int    //连续异常次数,中断采集
+	Supplement_SaveData              map[string]*SupplementSpider
 )
 )
 
 
+type SupplementSpider struct {
+	Site               string `bson:"site"`
+	Channel            string `bson:"channel"`
+	Spidercode         string `bson:"spidercode"`
+	Modifyuser         string `bson:"modifyuser"`
+	Finish             int    `bson:"finish"`
+	SaveNum            int    `bson:"savenum"`
+	EndPage            int    `bson:"endage"`
+	DownNum            int    `bson:"downnum"`
+	RepeatNum          int    `bson:"repeatnum"`
+	Comeintime         int64  `bson:"comeintime"`
+	Success            int    `bson:"success"`
+	Failed             int    `bson:"failed"`
+	PublishtimeZeroNum int    `bson:"ptimezeronum"`
+}
+
 func InitSupplement() {
 func InitSupplement() {
 	flag.BoolVar(&Supplement, "s", false, "是否为补采节点")
 	flag.BoolVar(&Supplement, "s", false, "是否为补采节点")
 	flag.StringVar(&Supplement_Cycle, "c", "day", "day:每天定点执行;week:每周定点执行")
 	flag.StringVar(&Supplement_Cycle, "c", "day", "day:每天定点执行;week:每周定点执行")
@@ -29,6 +51,7 @@ func InitSupplement() {
 	flag.Parse()
 	flag.Parse()
 	logger.Debug("Supplement:", "-s=", Supplement, "-c=", Supplement_Cycle, "-d=", Supplement_Day, "-e=", Supplement_MaxErrorTimes)
 	logger.Debug("Supplement:", "-s=", Supplement, "-c=", Supplement_Cycle, "-d=", Supplement_Day, "-e=", Supplement_MaxErrorTimes)
 	if Supplement {
 	if Supplement {
+		Supplement_SaveData = map[string]*SupplementSpider{}
 		Supplement_Publishtime = GetTime(-Supplement_Day)
 		Supplement_Publishtime = GetTime(-Supplement_Day)
 		if Supplement_Cycle == "day" {
 		if Supplement_Cycle == "day" {
 			Supplement_StartCron = "0 0 22 ? * *"
 			Supplement_StartCron = "0 0 22 ? * *"
@@ -52,5 +75,112 @@ func SupplementStart() {
 }
 }
 
 
 func SupplementEnd() {
 func SupplementEnd() {
+	SupplementDataCount() //补采数据统计,汇总
+	SupplementDataSave()
 	os.Exit(-1) //关闭应用
 	os.Exit(-1) //关闭应用
 }
 }
+
+func SupplementDataCount() {
+	logger.Info("补采数据统计开始...")
+	sess := MgoS.GetMgoConn()
+	defer MgoS.DestoryMongoConn(sess)
+	ch := make(chan bool, 5)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	startTime := time.Now().Unix() - 3600*12
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": startTime,
+		},
+		"event": 7001,
+	}
+	field := map[string]interface{}{
+		"state":       1,
+		"spidercode":  1,
+		"publishtime": 1,
+	}
+	count1 := MgoS.Count("spider_historydata_back", query)
+	logger.Info("spider_historydata_back count:", count1, startTime)
+	it := sess.DB(MgoS.DbName).C("spider_historydata_back").Find(&query).Select(&field).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			state := qu.IntAll(tmp["state"])
+			code := qu.ObjToString(tmp["spidercode"])
+			publishtime := qu.ObjToString(tmp["publishtime"])
+			lock.Lock()
+			if ss := Supplement_SaveData[code]; ss != nil { //爬虫执行完毕
+				ss.SaveNum++
+				if state == 1 {
+					ss.Success++
+				} else {
+					ss.Failed++
+				}
+				if publishtime == "0" || publishtime == "" {
+					ss.PublishtimeZeroNum++
+				}
+			}
+			lock.Unlock()
+		}(tmp)
+		tmp = map[string]interface{}{}
+	}
+	count2 := MgoS.Count("spider_historydata", query)
+	logger.Info("spider_historydata count:", count2)
+	it1 := sess.DB(MgoS.DbName).C("spider_historydata").Find(&query).Select(&field).Iter()
+	n1 := 0
+	for tmp := make(map[string]interface{}); it1.Next(tmp); n1++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			state := qu.IntAll(tmp["state"])
+			code := qu.ObjToString(tmp["spidercode"])
+			publishtime := qu.ObjToString(tmp["publishtime"])
+			lock.Lock()
+			if ss := Supplement_SaveData[code]; ss != nil { //爬虫执行完毕
+				ss.SaveNum++
+				if state == 1 {
+					ss.Success++
+				} else {
+					ss.Failed++
+				}
+				if publishtime == "0" || publishtime == "" {
+					ss.PublishtimeZeroNum++
+				}
+			}
+			lock.Unlock()
+		}(tmp)
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	logger.Info("补采数据统计完毕...")
+}
+
+func SupplementDataSave() {
+	var saveArr []map[string]interface{}
+	for code, ss := range Supplement_SaveData {
+		bt, err := bson.Marshal(ss)
+		if err != nil {
+			logger.Info("supplement marshal err:", code)
+			continue
+		}
+		save := map[string]interface{}{}
+		if bson.Unmarshal(bt, &save) == nil {
+			saveArr = append(saveArr, save)
+		} else {
+			logger.Info("supplement unmarshal err:", code)
+		}
+	}
+	if len(saveArr) > 0 {
+		MgoS.SaveBulk("spider_supplement", saveArr...)
+	}
+}