Prechádzať zdrojové kódy

列表页redis判重

maxiaoshan 3 rokov pred
rodič
commit
309a85ab2d
3 zmenil súbory, kde vykonal 7 pridanie a 159 odobranie
  1. 0 2
      src/main.go
  2. 1 1
      src/spider/spider.go
  3. 6 156
      src/spider/store.go

+ 0 - 2
src/main.go

@@ -118,8 +118,6 @@ func main() {
 	go heapprint()
 	//查列表页信息采集三级页
 	go spider.DetailData()
-	//定时任务(现在此任务由编辑器建任务时完成)
-	//go spider.TimeTask()
 	//批量保存错误数据
 	go spider.UpdateErrDataMgo()
 	//批量保存心跳信息

+ 1 - 1
src/spider/spider.go

@@ -534,7 +534,7 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 			*num++ //已采集
 			return
 		}
-		SaveHighListPageData(paramdata, href, num)
+		SaveHighListPageData(paramdata, s.SCode, href, num)
 		return
 	} else {
 		if !s.Stop {

+ 6 - 156
src/spider/store.go

@@ -6,9 +6,6 @@ import (
 	mu "mfw/util"
 	"qfw/util"
 	mgu "qfw/util/mongodbutil"
-	"sync"
-
-	"github.com/cron"
 
 	//"qfw/util/redis"
 	lu "spiderutil"
@@ -239,15 +236,15 @@ func SaveErrorData(modifyuser string, pd map[string]interface{}, err interface{}
 	}
 }
 
-//保存高性能模式采集的列表页信息
-func SaveHighListPageData(tmp map[string]interface{}, href string, num *int) {
+//保存modal=1模式采集的列表页信息
+func SaveHighListPageData(tmp map[string]interface{}, code, href string, num *int) {
 	//先判断redis,防止信息重复
-	isExist, _ := lu.ExistRedis("title_repeat_listpagehref", 0, href)
-	if isExist {
+	redisCode, _ := lu.GetRedisStr("title_repeat_listpagehref", 0, href)
+	if redisCode != "" && strings.Contains(redisCode, code) { //相同爬虫采集且href相同,表示重复
 		*num++
 		return
-	} else {
-		lu.PutRedis("title_repeat_listpagehref", 0, href, "", 3600*24*30*24)
+	} else { //存redis
+		lu.PutRedis("title_repeat_listpagehref", 0, href, code+"+"+redisCode, 3600*24*30*24)
 	}
 	tmp["state"] = 0
 	tmp["event"] = lu.Config.Uploadevent
@@ -268,153 +265,6 @@ func SaveListPageData(tmp map[string]interface{}, id *string, isEsRepeat bool) {
 	*id = Mgo.Save("spider_listdata", tmp)
 }
 
-//定时任务
-func TimeTask() {
-	c := cron.New()
-	if lu.Config.Uploadevent == 7100 { //只在7100上执行
-		c.AddFunc("0 0 1 * * ?", DownloadErrorData) //建editor任务
-		c.AddFunc("0 0 6 * * ?", ResetDataState)    //重置状态
-	}
-	c.Start()
-}
-
-//重置前一周内未下载成功的数据(一天3次未下成功的数据可以连续下一周)
-func ResetDataState() {
-	defer util.Catch()
-	logger.Info("-----更新数据状态-----")
-	sess := Mgo.GetMgoConn()
-	defer Mgo.DestoryMongoConn(sess)
-	ch := make(chan bool, 10)
-	wg := &sync.WaitGroup{}
-	lock := &sync.Mutex{}
-	query := map[string]interface{}{
-		"comeintime": map[string]interface{}{
-			"$gte": GetTime(-lu.Config.DayNum),
-		},
-		"state": -1,
-	}
-	field := map[string]interface{}{
-		"_id": 1,
-	}
-	it := sess.DB(Mgo.DbName).C("spider_highlistdata").Find(&query).Select(&field).Iter()
-	count, _ := sess.DB(Mgo.DbName).C("spider_highlistdata").Find(&query).Count()
-	logger.Info("更新数据状态数量:", count)
-	n := 0
-	arr := [][]map[string]interface{}{}
-	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
-		ch <- true
-		wg.Add(1)
-		go func(tmp map[string]interface{}) {
-			defer func() {
-				<-ch
-				wg.Done()
-			}()
-			update := []map[string]interface{}{}
-			update = append(update, map[string]interface{}{"_id": tmp["_id"]})
-			update = append(update, map[string]interface{}{"$set": map[string]interface{}{"times": 0, "state": 0}})
-			lock.Lock()
-			arr = append(arr, update)
-			if len(arr) > 500 {
-				tmps := arr
-				Mgo.UpdateBulk("spider_highlistdata", tmps...)
-				arr = [][]map[string]interface{}{}
-			}
-			lock.Unlock()
-		}(tmp)
-		tmp = map[string]interface{}{}
-	}
-	wg.Wait()
-	lock.Lock()
-	if len(arr) > 0 {
-		Mgo.UpdateBulk("spider_highlistdata", arr...)
-		arr = [][]map[string]interface{}{}
-	}
-	lock.Unlock()
-	logger.Info("-----更新数据状态完毕-----")
-}
-
-//前一天未下载成功数据建editor任务修改爬虫
-func DownloadErrorData() {
-	defer util.Catch()
-	logger.Info("-----Editor新建状态-----")
-	yesterday := GetTime(-1)
-	query := map[string]interface{}{
-		"state": -1,
-		"comeintime": map[string]interface{}{
-			"$gte": yesterday,
-			"$lte": yesterday + 86400,
-		},
-	}
-	fields := map[string]interface{}{
-		"spidercode": 1,
-		"href":       1,
-	}
-	list, _ := Mgo.Find("spider_highlistdata", query, nil, fields, false, -1, -1)
-	codeMap := map[string][]string{}
-	for _, l := range *list {
-		code := util.ObjToString(l["spidercode"])
-		href := util.ObjToString(l["href"])
-		if tmp := codeMap[code]; tmp == nil {
-			codeMap[code] = []string{href}
-		} else {
-			tmp = append(tmp, href)
-			codeMap[code] = tmp
-		}
-	}
-	for code, hrefs := range codeMap {
-		lua := *mgu.FindOne("luaconfig", "editor", "editor", map[string]interface{}{"code": code})
-		if len(lua) > 0 {
-			i_state := 0
-			if len(hrefs) > 5 { //数据大于5条待处理任务
-				i_state = 1
-				hrefs = hrefs[:5]
-			}
-			event := util.IntAll(lua["event"])
-			modifyuser := util.ObjToString(lua["modifyuser"])
-			modifyuserid := util.ObjToString(lua["modifyuserid"])
-			param := lua["param_common"].([]interface{})
-			channel := ""
-			site := ""
-			if len(param) >= 3 {
-				channel = util.ObjToString(param[2])
-				site = util.ObjToString(param[1])
-			}
-			task := map[string]interface{}{
-				"s_site":       site,
-				"s_channel":    channel,
-				"s_code":       code,
-				"i_state":      i_state,
-				"s_modify":     modifyuser,
-				"s_modifyid":   modifyuserid,
-				"i_urgency":    "4",
-				"event":        event,
-				"l_comeintime": time.Now().Unix(),
-				"l_complete":   GetCompleteTime(),
-				"s_descript":   "下载异常,连接:" + strings.Join(hrefs, "\n"),
-				"i_times":      0,
-				"type":         "download_err",
-			}
-			mgu.Save("task", "editor", "editor", task)
-		} else {
-			logger.Info("Find Lua Error:", code)
-		}
-	}
-	logger.Info("-----Editor新建状态完成-----")
-}
-
-//计算最迟完成时间
-func GetCompleteTime() (completetime int64) {
-	wd := time.Now().Weekday().String()
-	if wd == "Saturday" { //周六
-		completetime = GetTime(2) + 50400
-	} else if wd == "Sunday" { //周日
-		completetime = GetTime(1) + 50400
-	} else { //周一至周五
-		completetime = GetTime(0) + 50400
-	}
-	return
-}
-
 //获取第day天凌晨的时间戳
 func GetTime(day int) int64 {
 	nowTime := time.Now().AddDate(0, 0, day)