/* * 爬虫,脚本接口,需要扩展 */ package spider import ( "crypto/sha1" "fmt" elc "github.com/olivere/elastic/v7" "io" "log" mgo "mongodb" qu "qfw/util" es "qfw/util/elastic.v7" "sort" "strconv" "strings" "sync" "regexp" util "spiderutil" "sync/atomic" "time" "github.com/donnie4w/go-logger/logger" "github.com/yuin/gopher-lua" ) // Heart 心跳 type Heart struct { DetailHeart int64 //爬虫三级页执行心跳 DetailExecuteHeart int64 //三级页采集到数据心跳 FindListHeart int64 //findListHtml执行心跳 ListHeart int64 //爬虫列表页执行心跳 FirstPageHeart int64 //采集第一页的心跳 ModifyUser string //爬虫维护人 Site string //站点 Channel string //栏目 } // SpiderFlow 流量 type SpiderFlow struct { Flow int64 //流量 ModifyUser string //爬虫维护人 Site string //站点 Channel string //栏目 //Code string } // Spider 爬虫 type Spider struct { Script Code string //代码 Name string //名称 Channel string //站点 DownDetail bool //是否下载详细页 Stop bool //停止标志 Pass bool //暂停标志 LastPubshTime int64 //最后发布时间 LastHeartbeat int64 //最后心跳时间 SpiderRunRate int64 //执行频率 ExecuteOkTime int64 //任务执行成功/完成时间 Collection string //写入表名 Thread int64 //线程数 LastExecTime int64 //最后执行时间 LastDowncount int32 //最后一次下载量 TodayDowncount int32 //今日下载量 YesterdayDowncount int32 //昨日下载量 TotalDowncount int32 //总下载量 RoundCount int32 //执行轮次 StoreMode int //存储模式 StoreToMsgEvent int //消息类型 CoverAttr string //按属性判重数据 SleepBase int //基本延时 SleepRand int //随机延时 TargetChannelUrl string //栏目页地址 UpperLimit, LowerLimit int //正常值上限、下限 //UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间 MUserName, MUserEmail string //维护人,维护人邮箱 //Index int //数组索引 //历史补漏 IsHistoricalMend bool //是否是历史补漏爬虫 IsMustDownload bool //是否强制下载 IsCompete bool //区分新老爬虫 Infoformat int //区分爬虫类型 1:招标;2:拟建/审批;3:产权 IsMainThread bool //是否为主线程(多线程采集时区分是否为主线程) ListParallelTaskNum int //列表页爬虫执行任务并行数量 } var ( Es *es.Elastic EsIndex string EsType string MgoS *mgo.MongodbSim MgoEB *mgo.MongodbSim TimeChan = make(chan bool, 1) Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`) RestrictAccessReg = regexp.MustCompile(`访问被拒绝`) AllThreadNum int64 ListAllThreadNum int64 DelaySiteMap map[string]*DelaySite //延迟采集站点集合 UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息 SPH = make(chan bool, 5) DataBakSaveCache = make(chan map[string]interface{}, 1000) //保存采集信息详情页记录 DB_CH = make(chan bool, 5) ) type DelaySite struct { DelayTime int Compete bool } // 任务 func (s *Spider) StartJob() { s.Stop = false s.Pass = false s.RoundCount++ go s.ExecJob(false) } // 单次执行 func (s *Spider) ExecJob(reload bool) { defer func() { s.ExecuteOkTime = time.Now().Unix() util.TimeSleepFunc(5*time.Second, TimeSleepChan) if util.Config.Working == 1 { s.Stop = true if _, b := Allspiders.Load(s.Code); b { Allspiders.Store(s.Code, s) } //资源释放(队列模式下架操作时,不操作资源释放,执行完后自动释放) s.L.Close() CC <- s.L } }() //高效模式,轮询调度时重载脚本 if reload && util.Config.Working == 0 { s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false) } logger.Debug(s.Code, s.Name, "频率:", s.SpiderRunRate, ",", s.Timeout) s.LastDowncount = 0 s.LastExecTime = time.Now().Unix() s.LastHeartbeat = time.Now().Unix() s.ExecuteOkTime = 0 //err := s.GetLastPublishTime() //获取最新时间--作为最后更新时间 //if err != nil { // logger.Error(s.Code, err) //} //判断是否使用高并发下载三级页 var err interface{} if Supplement { err = s.SupplementDownListPageItem() //增量补采数据,下载列表 } else if util.Config.PageTurnInfo.ListThreadsNum > 1 { err = s.DownListPageItemByThreads() //并发下载列表 } else { err = s.DownListPageItem() //下载列表 } //if util.Config.Working == 0 && util.Config.Modal == 1 && !util.Config.IsHistoryEvent { // err = s.DownListPageItemByThreads() //下载列表 //} else { // err = s.DownListPageItem() //下载列表 //} if err != nil { logger.Error(s.Code, err) } if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //判断爬虫是增量还是历史爬虫(目前只会在7000节点上有历史爬虫) UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架 SpiderCodeSendToEditor(s.Code) //历史转增量爬虫发送编辑器,切换节点上下架 return } else { if util.Config.Working == 0 && !Supplement { //高性能模式 /* for !s.Stop && s.Pass { util.TimeSleepFunc(2*time.Second, TimeSleepChan) } if s.Stop { return } */ //if s.IsMustDownload { //历史数据下载,只跑一轮 if s.IsHistoricalMend && util.Config.IsHistoryEvent { //历史节点7000,高性能模式,历史补漏只下载一轮 UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架 b := MgoEB.Update("luaconfig", map[string]interface{}{"code": s.Code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false) logger.Info("Delete History Code:", s.Code, b) } else { if !s.Stop { //未下架定时执行 util.TimeAfterFunc(time.Duration(s.SpiderRunRate)*time.Minute, func() { s.ExecJob(true) }, TimeChan) } else { //下架后子线程退出 return } } } else { //排队模式或者数据补采 return } } } // 获取最新时间--作为最后更新时间 func (s *Spider) GetLastPublishTime() (errs interface{}) { defer qu.Catch() var lastpublishtime string //取得最后更新时间 if err := s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("getLastPublishTime"), NRet: 1, Protect: true, }); err != nil { //panic(s.Code + "," + err.Error()) log.Println(s.Code + "," + err.Error()) errs = err.Error() atomic.AddInt32(&s.Script.ErrorNum, 1) return errs } ret := s.L.Get(-1) s.L.Pop(1) if str, ok := ret.(lua.LString); ok { lastpublishtime = string(str) } if s.LastPubshTime < util.ParseDate2Int64(lastpublishtime) { //防止发布时间超前 if util.ParseDate2Int64(lastpublishtime) > time.Now().Unix() { s.LastPubshTime = time.Now().Unix() } else { s.LastPubshTime = util.ParseDate2Int64(lastpublishtime) } } return nil } // 下载列表(较DownListPageItemBack去掉了无数据的重试和重复页记录) func (s *Spider) DownListPageItem() (errs interface{}) { defer qu.Catch() start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页 s.MaxPage = max // repeatAllNum := 0 //本轮采集tmpMax页总的重复个数 downloadAllNum := 0 //本轮采集tmpMax页总个数 if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点,爬虫跑历史 max = s.GetIntVar("spiderHistoryMaxPage") } downtimes := 0 //记录某页重试次数(暂定3次) repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页) isRunRepeatList := false //是否执行列表页连续判重 if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重 isRunRepeatList = true max = util.Config.PageTurnInfo.TurnPageMaxLimit //高性能模式设置最大页为100,队列模式50页 } //子任务判断 if s.ContinueDownListChildTask { start = util.Config.PageTurnInfo.TurnPageMaxLimit + 1 //子任务起始页 max = util.Config.PageTurnInfo.TurnPageMaxLimit + util.Config.PageTurnInfo.NextPageMaxLimit //子任务最大页 } for ; start <= max && !s.Stop; start++ { if !s.Stop && !s.ContinueDownListChildTask { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list", start == 1) //记录所有节点列表页心跳 } //qu.Debug("爬虫:", s.Code, " 配置最大页:", s.MaxPage, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes) if isRunRepeatList && repeatPageTimes >= util.Config.PageTurnInfo.RepeatPageTimesLimit { //超过连续判重上限,不再翻页 break } if err := s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("downloadAndParseListPage"), NRet: 1, Protect: true, }, lua.LNumber(start)); err != nil { //panic(s.Code + "," + err.Error()) logger.Error("列表页采集报错", start, s.Code+","+err.Error()) errs = err.Error() atomic.AddInt32(&s.Script.ErrorNum, 1) //列表页采集报错进行重试,超过重试次数视为该页已采(脚本报错直接退出不进行内部重试) if downtimes < 3 { downtimes++ start-- } else if isRunRepeatList { //超过重试次数,视为本页重复 repeatPageTimes++ //次数加1 downtimes = 0 } continue } lv := s.L.Get(-1) s.L.Pop(1) if tbl, ok := lv.(*lua.LTable); ok { //list := []map[string]interface{}{} //qu.Debug("当前页数据量:", tbl.Len()) if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页 repeatListNum := 0 // 当前列表页连接重复个数 for i := 1; i <= tabLen; i++ { v := tbl.RawGetInt(i).(*lua.LTable) tmp := util.TableToMap(v) if !s.IsHistoricalMend { //不是历史补漏 tmp["dataging"] = 0 //数据中打标记dataging=0 if s.DownDetail { s.DownloadDetailItem(tmp, &repeatListNum) } } else { //历史补漏 s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页 } } repeatAllNum += repeatListNum downloadAllNum += tabLen if isRunRepeatList { //执行连续页码判重 if repeatListNum >= tabLen { //当前start列表页全部数据都已采集 repeatPageTimes++ //次数加1 } else { //当前start页有新数据,重复次数重置 repeatPageTimes = 0 } } } else if isRunRepeatList { repeatPageTimes++ //次数加1 } } else if isRunRepeatList { repeatPageTimes++ //次数加1 } downtimes = 0 //当前页下载无误,重置下载重试次数 util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan) } logger.Info(s.Code, "本轮列表页采集详情:", s.ContinueDownListChildTask, downloadAllNum, repeatAllNum, start, s.Stop) if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率 nowTime := time.Now() sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout) set := map[string]interface{}{ "site": s.Name, "channel": s.Channel, "spidercode": s.Code, "updatetime": nowTime.Unix(), "event": util.Config.Uploadevent, "modifyuser": s.MUserName, "maxpage": s.MaxPage, "runrate": s.SpiderRunRate, "endpage": start, "date": sDate, } inc := map[string]interface{}{ "alltimes": 1, } //记录翻页是否成功 if s.MaxPage > 1 { if s.PageOneTextHash != "" { if s.PageTwoTextHash != "" { if s.PageOneTextHash != s.PageTwoTextHash { inc["page_success"] = 1 } else { inc["page_fail"] = 1 } } else { inc["page_fail"] = 1 } } else if s.PageTwoTextHash != "" { inc["page_onefail"] = 1 } } if downloadAllNum > 0 { rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum) rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64) if rate == 1.0 { if downloadAllNum == 1 { //列表页数据过滤的只剩一条新数据 inc["oh_percent_onenum"] = 1 } else { inc["oh_percent"] = 1 } //} else if rate >= 0.9 { // inc["nt_percent"] = 1 //} else if rate >= 0.8 { // inc["et_percent"] = 1 //} else { // inc["other_percent"] = 1 } if isRunRepeatList && start > max { //连续翻页超过了上限 if !s.ContinueDownListChildTask { go ContinueDownListPageItem(s) //开启子任务继续采集 } else { inc["uplimit"] = 1 } } } else { inc["zero"] = 1 } query := map[string]interface{}{ "date": sDate, "spidercode": s.Code, } MgoS.Update("spider_downloadrate", query, map[string]interface{}{ "$set": set, "$inc": inc, }, true, false) } //信息重置 s.PageOneTextHash = "" s.PageTwoTextHash = "" return errs } func (s *Spider) DownListPageItemBack() (errs interface{}) { defer qu.Catch() start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页 s.MaxPage = max // //tmpMax := max //临时记录最大页 repeatAllNum := 0 //本轮采集tmpMax页总的重复个数 downloadAllNum := 0 //本轮采集tmpMax页总个数 if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点,爬虫跑历史 max = s.GetIntVar("spiderHistoryMaxPage") } downtimes := 0 //记录某页重试次数(暂定3次) repeatPageNum := 0 //记录列表页所有连接重复的页码 repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页) isRunRepeatList := false //是否执行列表页连续判重 if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重 isRunRepeatList = true max = util.Config.PageTurnInfo.TurnPageMaxLimit //高性能模式设置最大页为100,队列模式50页 } //子任务判断 if s.ContinueDownListChildTask { start = util.Config.PageTurnInfo.TurnPageMaxLimit + 1 max = util.Config.PageTurnInfo.TurnPageMaxLimit + util.Config.PageTurnInfo.NextPageMaxLimit } for ; start <= max && !s.Stop; start++ { if !s.Stop && !s.ContinueDownListChildTask { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list", start == 1) //记录所有节点列表页心跳 } //qu.Debug("爬虫:", s.Code, "重复页:", repeatPageNum, " 配置最大页:", s.MaxPage, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes) //if start > tmpMax && isRunRepeatList && repeatPageTimes >= 5 { //重复次数超过5次,不再翻页 // break //} if isRunRepeatList && repeatPageTimes >= util.Config.PageTurnInfo.RepeatPageTimesLimit { //超过连续判重上限,不再翻页 break } if err := s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("downloadAndParseListPage"), NRet: 1, Protect: true, }, lua.LNumber(start)); err != nil { //panic(s.Code + "," + err.Error()) logger.Error("列表页采集报错", start, s.Code+","+err.Error()) errs = err.Error() atomic.AddInt32(&s.Script.ErrorNum, 1) //列表页采集报错进行重试,超过重试次数视为该页已采 if downtimes < 2 { downtimes++ start-- } else if isRunRepeatList { //超过重试次数,视为本页重复 if repeatPageNum+1 == start { repeatPageTimes++ //次数加1 } else { repeatPageTimes = 0 //重复次数重置0 } repeatPageNum = start //赋值页码 downtimes = 0 } continue } lv := s.L.Get(-1) s.L.Pop(1) if tbl, ok := lv.(*lua.LTable); ok { //list := []map[string]interface{}{} //qu.Debug("当前页数据量:", tbl.Len()) if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页 repeatListNum := 0 // 当前列表页连接重复个数 for i := 1; i <= tabLen; i++ { v := tbl.RawGetInt(i).(*lua.LTable) tmp := util.TableToMap(v) if !s.IsHistoricalMend { //不是历史补漏 tmp["dataging"] = 0 //数据中打标记dataging=0 if s.DownDetail { s.DownloadDetailItem(tmp, &repeatListNum) } /*else {//暂无此类爬虫 tmp["comeintime"] = time.Now().Unix() //atomic.AddInt32(&s.LastDowncount, 1) //atomic.AddInt32(&s.TodayDowncount, 1) //atomic.AddInt32(&s.TotalDowncount, 1) href := fmt.Sprint(tmp["href"]) if len(href) > 5 { //有效数据 hashHref := util.HexText(href) util.RedisClusterSet(hashHref, "", -1) //全量redis list = append(list, tmp) } }*/ } else { //历史补漏 s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页 } } repeatAllNum += repeatListNum downloadAllNum += tabLen if isRunRepeatList { //执行连续页码判重 if repeatListNum >= tabLen { //当前start列表页全部数据都已采集 //qu.Debug("重复页:", repeatPageNum, "当前页:", start) if repeatPageNum+1 == start || repeatPageNum == 0 { repeatPageTimes++ //次数加1 } else { repeatPageTimes = 0 //重复次数重置0 } repeatPageNum = start //赋值页码 } else { //当前start页有遗漏数据 repeatPageTimes = 0 repeatPageNum = 0 } } //if !s.IsHistoricalMend && !s.DownDetail { // if len(list) > 0 { //保存信息入库 // StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list) // } //} } else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页(存在列表页数据被全部过滤的情况) if downtimes < 2 { downtimes++ start-- continue } else if isRunRepeatList { //超过重试次数,视为本页重复 if repeatPageNum+1 == start { repeatPageTimes++ //次数加1 } else { repeatPageTimes = 0 //重复次数重置0 } repeatPageNum = start //赋值页码 } } } else { //请求当前列表页失败 if downtimes < 2 { downtimes++ start-- continue } else if isRunRepeatList { //超过重试次数,视为本页重复 if repeatPageNum+1 == start { repeatPageTimes++ //次数加1 } else { repeatPageTimes = 0 //重复次数重置0 } repeatPageNum = start //赋值页码 } } downtimes = 0 //当前页下载无误,重置下载重试次数 util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan) } logger.Info(s.Code, "本轮列表页采集详情:", s.ContinueDownListChildTask, downloadAllNum, repeatAllNum, start, s.Stop) if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率 nowTime := time.Now() sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout) set := map[string]interface{}{ "site": s.Name, "channel": s.Channel, "spidercode": s.Code, "updatetime": nowTime.Unix(), "event": util.Config.Uploadevent, "modifyuser": s.MUserName, "maxpage": s.MaxPage, "runrate": s.SpiderRunRate, "endpage": start, "date": sDate, } inc := map[string]interface{}{ "alltimes": 1, } //记录翻页是否成功 if s.MaxPage > 1 { if s.PageOneTextHash != "" { if s.PageTwoTextHash != "" { if s.PageOneTextHash != s.PageTwoTextHash { inc["page_success"] = 1 } else { inc["page_fail"] = 1 } } else { inc["page_fail"] = 1 } } else if s.PageTwoTextHash != "" { inc["page_onefail"] = 1 } } if downloadAllNum > 0 { rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum) rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64) if rate == 1.0 { if downloadAllNum == 1 { //列表页数据过滤的只剩一条新数据 inc["oh_percent_onenum"] = 1 } else { inc["oh_percent"] = 1 } //} else if rate >= 0.9 { // inc["nt_percent"] = 1 //} else if rate >= 0.8 { // inc["et_percent"] = 1 //} else { // inc["other_percent"] = 1 } if isRunRepeatList && start > max { //连续翻页超过了上限 if !s.ContinueDownListChildTask { go ContinueDownListPageItem(s) //开启子任务继续采集 } else { inc["uplimit"] = 1 } } } else { inc["zero"] = 1 } query := map[string]interface{}{ "date": sDate, "spidercode": s.Code, } MgoS.Update("spider_downloadrate", query, map[string]interface{}{ "$set": set, "$inc": inc, }, true, false) } //信息重置 s.PageOneTextHash = "" s.PageTwoTextHash = "" return errs } // 并发下载列表 func (s *Spider) DownListPageItemByThreads() (errs interface{}) { defer qu.Catch() start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页 s.MaxPage = max //记录爬虫配置的最大页 repeatAllNum := int64(0) //本轮采集总的重复个数 downloadAllNum := int64(0) //本轮采集总个数 if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点根据爬虫类型,取采集的最大页 max = s.GetIntVar("spiderHistoryMaxPage") //采集历史的爬虫,取历史最大页配置spiderHistoryMaxPage } isRunRepeatList := false //是否执行列表页连续判重逻辑 //是否进行连续翻页判断,修改最大页 if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重 isRunRepeatList = true max = util.Config.PageTurnInfo.TurnPageMaxLimit //高性能模式设置最大页为100,队列模式50页 } //子任务判断 //if s.ContinueDownListChildTask { // start = util.Config.PageTurnInfo.TurnPageMaxLimit + 1 // max = util.Config.PageTurnInfo.TurnPageMaxLimit + util.Config.PageTurnInfo.NextPageMaxLimit //} //创建并发Spider对象 spChan := make(chan *Spider, 1) if isRunRepeatList && util.Config.PageTurnInfo.ListThreadsNum > 1 { //无限翻页模式设置spChan并发大小 spChan = make(chan *Spider, util.Config.PageTurnInfo.ListThreadsNum) spChan <- s //加入通道 NewSpiderByScript(util.Config.PageTurnInfo.ListThreadsNum-1, s.Code, s.ScriptFile, spChan) //创建多个Spider对象 } else { spChan <- s //加入通道 } endPage := 0 //结束页 repeatTimes := 0 //连续判重次数 for ; start <= max && !s.Stop; start += util.Config.PageTurnInfo.ListThreadsNum { if !s.Stop && !s.ContinueDownListChildTask { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list", start == 1) //记录所有节点列表页心跳 } listWg := &sync.WaitGroup{} pageMap := map[int]bool{} //记录某页数据是否都已采集 pageArr := []int{} lock := &sync.Mutex{} //并发下载列表页 for listThreadNum := 0; listThreadNum < util.Config.PageTurnInfo.ListThreadsNum; listThreadNum++ { pagenum := start + listThreadNum //当前实际采集页码 if pagenum > max { //并发采集时,每次开启repeatPageTimesLimit个并发,并发有可能超过max上限 break } spTmp := <-spChan //通道中取出sp对象 listWg.Add(1) atomic.AddInt64(&ListAllThreadNum, 1) endPage = pagenum + 1 go func(sp *Spider, pagenum int) { defer func() { spChan <- sp //处理完数据sp对象放回通道中 listWg.Done() atomic.AddInt64(&ListAllThreadNum, -1) }() //下载某一页数据 downnum, repeatnum := sp.DownListOnePage(pagenum) //logger.Info(sp.Code, "pagenum", pagenum, "repeat", downnum == repeatnum, downnum, repeatnum, &sp) //汇总下载量 atomic.AddInt64(&downloadAllNum, int64(downnum)) atomic.AddInt64(&repeatAllNum, int64(repeatnum)) lock.Lock() pageMap[pagenum] = downnum == repeatnum //当前pagenum页数据是否已采集 pageArr = append(pageArr, pagenum) lock.Unlock() if downnum > 0 { //使用并发采集列表页时,spider对象不是同一个,只能采集后统计 if pagenum == 1 { //将第一页sp采集信息的hash值赋值给s s.PageOneTextHash = sp.PageOneTextHash } else if pagenum == 2 { //将第二页sp采集信息的hash值赋值给s s.PageTwoTextHash = sp.PageTwoTextHash } } }(spTmp, pagenum) } listWg.Wait() if isRunRepeatList { sort.Ints(pageArr) //页码从小到大排序 for _, page := range pageArr { if pageMap[page] { // repeatTimes++ } else { repeatTimes = 0 } } if repeatTimes >= util.Config.PageTurnInfo.RepeatPageTimesLimit { //超过连续判重上限,不再翻页 break } } } //close(spChan) //关闭通道,释放资源 logger.Info(s.Code, "本轮列表页采集详情:", s.ContinueDownListChildTask, downloadAllNum, repeatAllNum, endPage, s.Stop) if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率 nowTime := time.Now() sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout) set := map[string]interface{}{ "site": s.Name, "channel": s.Channel, "spidercode": s.Code, "updatetime": nowTime.Unix(), "event": util.Config.Uploadevent, "modifyuser": s.MUserName, "maxpage": s.MaxPage, "runrate": s.SpiderRunRate, "endpage": endPage, "date": sDate, } inc := map[string]interface{}{ "alltimes": 1, } //记录翻页是否成功 if s.MaxPage > 1 { //最大页为1的,用列表页是否异常体现爬虫运行情况 if s.PageOneTextHash != "" { if s.PageTwoTextHash != "" { if s.PageOneTextHash != s.PageTwoTextHash { inc["page_success"] = 1 } else { inc["page_fail"] = 1 } } else { inc["page_fail"] = 1 } } else if s.PageTwoTextHash != "" { inc["page_onefail"] = 1 } } if downloadAllNum > 0 { rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum) rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64) if rate == 1.0 { if downloadAllNum == 1 { //列表页数据过滤的只剩一条新数据 inc["oh_percent_onenum"] = 1 } else { inc["oh_percent"] = 1 } //} else if rate >= 0.9 { // inc["nt_percent"] = 1 //} else if rate >= 0.8 { // inc["et_percent"] = 1 //} else { // inc["other_percent"] = 1 } if isRunRepeatList && endPage > max { //连续翻页超过了上限 if !s.ContinueDownListChildTask { go ContinueDownListPageItem(s) //开启子任务继续采集 } else { inc["uplimit"] = 1 } } } else { inc["zero"] = 1 } query := map[string]interface{}{ "date": sDate, "spidercode": s.Code, } MgoS.Update("spider_downloadrate", query, map[string]interface{}{ "$set": set, "$inc": inc, }, true, false) } //信息重置 s.PageOneTextHash = "" s.PageTwoTextHash = "" return errs } // 并发下载列表 func (s *Spider) DownListPageItemByThreadsBack() (errs interface{}) { defer qu.Catch() start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页 s.MaxPage = max //记录爬虫配置的最大页 repeatAllNum := int64(0) //本轮采集总的重复个数 downloadAllNum := int64(0) //本轮采集总个数 if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点根据爬虫类型,取采集的最大页 max = s.GetIntVar("spiderHistoryMaxPage") //采集历史的爬虫,取历史最大页配置spiderHistoryMaxPage } repeatPageTimesLimit := 1 //记录页码连续判重的次数上限(默认1:for循环翻页时等效,至少+1) isRunRepeatList := false //是否执行列表页连续判重逻辑 //是否进行连续翻页判断,修改最大页 if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重 isRunRepeatList = true repeatPageTimesLimit = util.Config.PageTurnInfo.RepeatPageTimesLimit max = util.Config.PageTurnInfo.TurnPageMaxLimit //高性能模式设置最大页为100,队列模式50页 } //子任务判断 if s.ContinueDownListChildTask { start = util.Config.PageTurnInfo.TurnPageMaxLimit + 1 max = util.Config.PageTurnInfo.TurnPageMaxLimit + util.Config.PageTurnInfo.NextPageMaxLimit } //创建并发Spider对象 spChan := make(chan *Spider, 1) if isRunRepeatList { //无限翻页模式设置spChan并发大小 spChan = make(chan *Spider, repeatPageTimesLimit) //并发数量由连续翻页判重数量决定 spChan <- s //加入通道 NewSpiderByScript(repeatPageTimesLimit-1, s.Code, s.ScriptFile, spChan) //创建多个Spider对象 } else { spChan <- s //加入通道 } endPage := 0 //结束页 for ; start <= max && !s.Stop; start += repeatPageTimesLimit { if !s.Stop && !s.ContinueDownListChildTask { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list", start == 1) //记录所有节点列表页心跳 } listWg := &sync.WaitGroup{} isContinue := false //是否继续采集 //并发下载列表页 for listThreadNum := 0; listThreadNum < repeatPageTimesLimit; listThreadNum++ { pagenum := start + listThreadNum //当前实际采集页码 if pagenum > max { //并发采集时,每次开启repeatPageTimesLimit个并发,并发有可能超过max上限 break } spTmp := <-spChan //通道中取出sp对象 listWg.Add(1) atomic.AddInt64(&ListAllThreadNum, 1) endPage = pagenum + 1 go func(sp *Spider, pagenum int) { defer func() { spChan <- sp //处理完数据sp对象放回通道中 listWg.Done() atomic.AddInt64(&ListAllThreadNum, -1) }() //下载某一页数据 downnum, repeatnum := sp.DownListOnePage(pagenum) //汇总下载量 atomic.AddInt64(&downloadAllNum, int64(downnum)) atomic.AddInt64(&repeatAllNum, int64(repeatnum)) if downnum > 0 { if downnum-repeatnum > 0 { //本页有新数据 isContinue = true } //使用并发采集列表页时,spider对象不是同一个,只能采集后统计 if pagenum == 1 { //将第一页sp采集信息的hash值赋值给s s.PageOneTextHash = sp.PageOneTextHash } else if pagenum == 2 { //将第二页sp采集信息的hash值赋值给s s.PageTwoTextHash = sp.PageTwoTextHash } } //qu.Debug("第", pagenum, "页采集信息:", downnum, repeatnum) }(spTmp, pagenum) } listWg.Wait() if !isContinue { //并发采集结果中,如果某页有新数据,继续采集,直到上限页 break } } close(spChan) //关闭通道,释放资源 logger.Info(s.Code, "本轮列表页采集详情:", s.ContinueDownListChildTask, downloadAllNum, repeatAllNum, endPage, s.Stop) if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率 nowTime := time.Now() sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout) set := map[string]interface{}{ "site": s.Name, "channel": s.Channel, "spidercode": s.Code, "updatetime": nowTime.Unix(), "event": util.Config.Uploadevent, "modifyuser": s.MUserName, "maxpage": s.MaxPage, "runrate": s.SpiderRunRate, "endpage": endPage, "date": sDate, } inc := map[string]interface{}{ "alltimes": 1, } //记录翻页是否成功 if s.MaxPage > 1 { //最大页为1的,用列表页是否异常体现爬虫运行情况 if s.PageOneTextHash != "" { if s.PageTwoTextHash != "" { if s.PageOneTextHash != s.PageTwoTextHash { inc["page_success"] = 1 } else { inc["page_fail"] = 1 } } else { inc["page_fail"] = 1 } } else if s.PageTwoTextHash != "" { inc["page_onefail"] = 1 } } if downloadAllNum > 0 { rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum) rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64) if rate == 1.0 { if downloadAllNum == 1 { //列表页数据过滤的只剩一条新数据 inc["oh_percent_onenum"] = 1 } else { inc["oh_percent"] = 1 } //} else if rate >= 0.9 { // inc["nt_percent"] = 1 //} else if rate >= 0.8 { // inc["et_percent"] = 1 //} else { // inc["other_percent"] = 1 } if isRunRepeatList && endPage > max { //连续翻页超过了上限 if !s.ContinueDownListChildTask { go ContinueDownListPageItem(s) //开启子任务继续采集 } else { inc["uplimit"] = 1 } } } else { inc["zero"] = 1 } query := map[string]interface{}{ "date": sDate, "spidercode": s.Code, } MgoS.Update("spider_downloadrate", query, map[string]interface{}{ "$set": set, "$inc": inc, }, true, false) } //信息重置 s.PageOneTextHash = "" s.PageTwoTextHash = "" return errs } // 补采下载列表 func (s *Spider) SupplementDownListPageItem() (errs interface{}) { defer qu.Catch() var ( errtimes int //采集异常次数(暂定10次) errPageNum int //当前采集异常页码 downtimes int //记录某页重试次数(暂定3次) downloadAllNum int //记录本次采集,信息采集总量 saveAllNum int //记录本次采集,信息补采总量 repeatAllNum int //记录本次采集,信息重复总量 pageTitleHash string //记录当前页所有title文本 finishText = "正常退出" // start = 1 //起始页 ) for { if errtimes >= Supplement_MaxErrorTimes { //连续异常次数超过10次,爬虫不再翻页 finishText = "异常退出" logger.Info(s.Code + "连续10页采集异常") break } if err := s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("downloadAndParseListPage"), NRet: 1, Protect: true, }, lua.LNumber(start)); err != nil { //panic(s.Code + "," + err.Error()) logger.Error("列表页采集报错", start, s.Code+","+err.Error()) errs = err.Error() if downtimes < 3 { downtimes++ } else if errtimes == 0 || start == errPageNum+1 { errtimes++ errPageNum = start start++ downtimes = 0 } continue } lv := s.L.Get(-1) s.L.Pop(1) if tbl, ok := lv.(*lua.LTable); ok { if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页 var ( publishtimeErrTimes int text string repeatListNum int // 当前列表页连接重复个数 num = 1 isBreak = false ) for ; num <= tabLen; num++ { v := tbl.RawGetInt(num).(*lua.LTable) tmp := util.TableToMap(v) tmp["dataging"] = 0 //数据中打标记dataging=0 s.DownloadDetailItem(tmp, &repeatListNum) pTmp := qu.ObjToString(tmp["publishtime"]) title := qu.ObjToString(tmp["title"]) text += title pTime, _ := time.ParseInLocation(qu.Date_Full_Layout, pTmp, time.Local) publishtime := pTime.Unix() if publishtime > 1000000000 && publishtime < Supplement_Publishtime { //正常退出 isBreak = true //break } else if publishtime <= 1000000000 { //异常发布时间 publishtimeErrTimes++ } } logger.Info(s.Code, start, tabLen, repeatListNum) downloadAllNum += tabLen //采集总量累计 repeatAllNum += repeatListNum //重复总量累计 saveAllNum += num - 1 - repeatListNum //保存总量累计 tmpPageTitleHash := pageTitleHash // pageTitleHash = util.HexText(text) // if tabLen == publishtimeErrTimes || tmpPageTitleHash == pageTitleHash { //当前页数据发布时间均异常;当前页与上页采集内容一致 //if errtimes == 0 || start == errPageNum+1 { errtimes++ errPageNum = start start++ //} continue } else if isBreak { //中断不再采集 start++ break } } else { if downtimes < 3 { downtimes++ } else if errtimes == 0 || start == errPageNum+1 { errtimes++ errPageNum = start start++ downtimes = 0 } continue } } else { if downtimes < 3 { downtimes++ } else if errtimes == 0 || start == errPageNum+1 { errtimes++ errPageNum = start start++ downtimes = 0 } continue } start++ downtimes = 0 errtimes = 0 errPageNum = 0 util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan) } logger.Info(s.Code, "本轮列表页采集详情:", downloadAllNum, repeatAllNum, saveAllNum, finishText) save := map[string]interface{}{ "site": s.Name, "channel": s.Channel, "spidercode": s.Code, "comeintime": time.Now().Unix(), "modifyuser": s.MUserName, "endpage": start, "finish": finishText, "savenum": saveAllNum, "count": downloadAllNum, "repeat": repeatAllNum, } MgoS.Save("spider_supplement", save) return errs } // 下载某一页数据 func (s *Spider) DownListOnePage(pagenum int) (downnum, repeatnum int) { defer qu.Catch() downtimes := 0 for downtimes < 3 { //错误重试3次 if err := s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("downloadAndParseListPage"), NRet: 1, Protect: true, }, lua.LNumber(pagenum)); err != nil { //panic(s.Code + "," + err.Error()) logger.Error("列表页采集报错", pagenum, s.Code+","+err.Error()) atomic.AddInt32(&s.Script.ErrorNum, 1) //列表页采集报错进行重试 downtimes++ continue } lv := s.L.Get(-1) s.L.Pop(1) if tbl, ok := lv.(*lua.LTable); ok { //list := []map[string]interface{}{} //qu.Debug("当前页数据量:", tbl.Len()) if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页 repeatListNum := 0 // 当前列表页连接重复个数 for i := 1; i <= tabLen; i++ { v := tbl.RawGetInt(i).(*lua.LTable) tmp := util.TableToMap(v) if !s.IsHistoricalMend { //不是历史补漏 tmp["dataging"] = 0 //数据中打标记dataging=0 if s.DownDetail { s.DownloadDetailItem(tmp, &repeatListNum) } } else { //历史补漏 s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页 } } repeatnum = repeatListNum downnum = tabLen return //if !s.IsHistoricalMend && !s.DownDetail { // if len(list) > 0 { //保存信息入库 // StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list) // } //} } else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页(存在列表页数据被全部过滤的情况) downtimes++ continue } } else { //请求当前列表页失败 downtimes++ continue } } return } // 开启单独线程继续采集列表页 func ContinueDownListPageItem(s *Spider) { defer qu.Catch() spTmp, errstr := CreateSpider(s.SCode, s.ScriptFile, true, true) //生成新爬虫 logger.Info(s.SCode, "补充连续翻页开始...") if errstr == "" && spTmp != nil && spTmp.Code != "nil" { //脚本加载成功 spTmp.ContinueDownListChildTask = true defer spTmp.L.Close() err := spTmp.DownListPageItem() //下载列表 logger.Info(s.SCode, "补充连续翻页结束...") if err != nil { logger.Error(spTmp.Code, err) } } } // 遍历,开启三级页下载(历史补漏) func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) { //qu.Debug("--------------历史下载-----------------") defer qu.Catch() var err interface{} data := map[string]interface{}{} paramdata := p.(map[string]interface{}) for k, v := range paramdata { data[k] = v } href := qu.ObjToString(data["href"]) if len(href) <= 5 { //无效数据 return } hashHref := util.HexText(href) isExist := util.RedisExist("list", "list_"+hashHref) //logger.Debug("full href:", href, " isExist:", isExist) if !s.IsMustDownload { //非强制下载 if isExist { //数据存在,直接return return } else if util.Config.IsHistoryEvent { //1、7000(历史节点)的历史补漏,数据存入spider_historydata num := 0 SaveHighListPageData(paramdata, hashHref, &num) return } } else { //当前不支持强制下载 return } //2、非7000(历史节点)的历史补漏,采完列表直接采详情,采完爬虫下架(当前无此爬虫) id := "" isEsRepeat := false if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete { title := qu.ObjToString(paramdata["title"]) eTime := time.Now().Unix() sTime := eTime - int64(7*86400) //esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}` esQuery := elc.NewBoolQuery().Must(elc.NewRangeQuery("comeintime").Gte(sTime).Lte(eTime)).Must(elc.NewTermQuery("title.mtitle", title)) if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态 isEsRepeat = true } } SaveListPageData(paramdata, &id, isEsRepeat) //存储采集记录 if isEsRepeat { //类竞品数据title判重数据加入redis util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) util.AddBloomRedis("href", href) return } //qu.Debug("----------------下载、解析、入库--------------------") //下载详情页 data, err = s.DownloadDetailPage(paramdata, data) if err != nil || data == nil { //下载失败,结束 if err != nil { logger.Error(s.Code, err, paramdata) } //更新spider_listdata中数据下载失败标记 MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}}) return } util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) //采集成功,加入列表页redis //根据发布时间进行数据判重校验 tmphref := qu.ObjToString(data["href"]) //取tmphref,三级页href替换导致前后href不同 publishtime := qu.Int64All(data["l_np_publishtime"]) if publishtime < time.Now().AddDate(-1, 0, 0).Unix() { //一年前数据进行全量bloom redis href判重 isExist, _ = util.ExistsBloomRedis("href", tmphref) if isExist { MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "tmphref": tmphref, "updatetime": time.Now().Unix()}}) return } } //详情页过滤数据 set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()} if data["delete"] != nil { //util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到 set["exist"] = "delete" //MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true) MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": set}) return } //更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新) MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true) //三级页href替换导致前后href不同,采集成功后将原始href加入全量redis //if tmphref := qu.ObjToString(data["href"]); tmphref != href { // util.AddBloomRedis("href", href) //} flag := true //publishtime := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) //publishtime if s.IsMustDownload { //强制下载 if isExist && publishtime < time.Now().AddDate(0, 0, -5).Unix() { //qu.Debug("强制下载 redis存在") data["dataging"] = 1 //此处dataging=1对应保存服务中取redis中href对应的id值,进行更新(现redis中已无id值,所以无效) flag = false } else { //qu.Debug("强制下载 redis不存在") data["dataging"] = 0 } } else { //非强制下载 if !isExist { //qu.Debug("非强制下载 redis不存在") data["dataging"] = 0 } } //if publishtime > time.Now().Unix() { //防止发布时间超前 // data["publishtime"] = time.Now().Unix() //} delete(data, "state") delete(data, "exit") delete(data, "checkpublishtime") data["comeintime"] = time.Now().Unix() data["spidercode"] = s.Code //qu.Debug("--------------开始保存---------------") data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断) Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, flag) //qu.Debug("--------------保存结束---------------") } // 遍历,开启三级页下载(增量) func (s *Spider) DownloadDetailItem(p interface{}, num *int) { defer qu.Catch() var err interface{} data := map[string]interface{}{} paramdata := p.(map[string]interface{}) for k, v := range paramdata { data[k] = v } href := qu.ObjToString(data["href"]) if len(href) <= 5 { //无效数据 *num++ //视为已采集 return } hashHref := util.HexText(href) //列表页redis判重 isExist := util.RedisExist("list", "list_"+hashHref) if Supplement && !isExist { //补采,再进行全量redis判重 isExist, _ = util.ExistsBloomRedis("href", href) } if isExist { *num++ //已采集 return } id := "" //记录spider_listdata中保存的数据id,便于下载成功后更新状态 //if util.Config.Modal == 1 || (util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history") { //列表页、详情页分开采集模式节点和7000节点新爬虫采集的数据数据 if util.Config.Modal == 1 || util.Config.IsHistoryEvent || Supplement { //分开采集模式和历史节点(7000) SaveHighListPageData(paramdata, hashHref, num) //存表 return } else { if !s.Stop { UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail", false) //记录modal=0老模式采集三级页心跳 } isEsRepeat := false if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete { title := qu.ObjToString(paramdata["title"]) eTime := time.Now().Unix() sTime := eTime - int64(7*86400) //esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}` esQuery := elc.NewBoolQuery().Must(elc.NewRangeQuery("comeintime").Gte(sTime).Lte(eTime)).Must(elc.NewTermQuery("title.mtitle", title)) if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态 isEsRepeat = true } } SaveListPageData(paramdata, &id, isEsRepeat) //保存7000、7410、7500、7510、7520、7700节点列表页采集的信息 if isEsRepeat { //类竞品数据title判重数据加入redis util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) util.AddBloomRedis("href", href) return } } //下载详情页 data, err = s.DownloadDetailPage(paramdata, data) if err != nil || data == nil { *num++ //顺序采集模式,在记录重复数据个数时,采集失败记为重复(避免下载失败数据每轮次采集都不会被判重,造成全采次数+1) if err != nil { logger.Error(s.Code, err, paramdata) //if len(paramdata) > 0 { // SaveErrorData(s.MUserName, paramdata, err) //保存错误信息 //} } //更新spider_listdata中数据下载失败标记 MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1, "updatetime": time.Now().Unix()}}) return } /*else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同 util.RedisClusterSet(hashHref, "", -1) //全量redis中存值列表页href }*/ util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) //加入列表页redis //根据发布时间进行数据判重校验 tmphref := qu.ObjToString(data["href"]) publishtime := qu.Int64All(data["l_np_publishtime"]) //7410节点(变链接节点)或者一年前数据进行全量bloomredis href判重 if util.Config.Uploadevent == 7410 || publishtime < time.Now().AddDate(-1, 0, 0).Unix() { isExist, _ = util.ExistsBloomRedis("href", tmphref) if isExist { //MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "tmphref": tmphref, "updatetime": time.Now().Unix()}}) MgoS.Update("spider_listdata", map[string]interface{}{"href": tmphref}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "byid": id, "tmphref": tmphref, "updatetime": time.Now().Unix()}}, false, true) return } } //详情页下载数据成功心跳 if !s.Stop { UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute", false) //记录modal=0老模式采集到数据心跳 } set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()} //详情页过滤数据 if data["delete"] != nil { //util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到 set["exist"] = "delete" //MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true) MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": set}) return } set["byid"] = id //更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新) MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true) //三级页href替换导致前后href不同,采集成功后将原始href加入全量redis //if tmphref := qu.ObjToString(data["href"]); tmphref != href { // util.AddBloomRedis("href", href) //} delete(data, "state") delete(data, "exit") delete(data, "checkpublishtime") data["comeintime"] = time.Now().Unix() data["spidercode"] = s.Code data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断) data["infoformat"] = s.Infoformat //爬虫类型 Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true) } // 遍历下载名录 func (s *Spider) DownloadDetailByNames(p interface{}) { defer qu.Catch() var err interface{} /* if s.Stop { return } for s.Pass { util.TimeSleepFunc(2*time.Second, TimeSleepChan) } */ data := map[string]interface{}{} paramdata := p.(map[string]interface{}) for k, v := range paramdata { data[k] = v } if s.DownDetail { href := qu.ObjToString(data["href"]) if href == "" || len(href) < 5 { //无效数据 return } //下载、解析、入库 data, err = s.DownloadDetailPage(paramdata, data) if err != nil { logger.Error(s.Code, paramdata, err) return } } data["comeintime"] = time.Now().Unix() //atomic.AddInt32(&s.LastDowncount, 1) //atomic.AddInt32(&s.TodayDowncount, 1) //atomic.AddInt32(&s.TotalDowncount, 1) Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true) } // 下载解析详情页 func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) { defer qu.Catch() s.LastHeartbeat = time.Now().Unix() util.TimeSleepFunc((time.Duration(s.SleepBase+util.GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan) tab := s.L.NewTable() for k, v := range param { if val, ok := v.(string); ok { tab.RawSet(lua.LString(k), lua.LString(val)) } else if val, ok := v.(int64); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(int32); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(float64); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(float32); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(bool); ok { tab.RawSet(lua.LString(k), lua.LBool(val)) } } var err error if err = s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("downloadDetailPage"), NRet: 1, Protect: true, }, tab); err != nil { //panic(s.Code + "," + err.Error()) log.Println(s.Code + "," + err.Error()) atomic.AddInt32(&s.Script.ErrorNum, 1) return data, err } lv := s.L.Get(-1) s.L.Pop(1) //拼map if v3, ok := lv.(*lua.LTable); ok { v3.ForEach(func(k, v lua.LValue) { if tmp, ok := k.(lua.LString); ok { key := string(tmp) if value, ok := v.(lua.LString); ok { data[key] = string(value) } else if value, ok := v.(lua.LNumber); ok { data[key] = int64(value) } else if value, ok := v.(*lua.LTable); ok { tmp := util.TableToMap(value) data[key] = tmp } } }) return data, err } else { return nil, err } } // 高性能模式定时采集三级页信息 func DetailData() { defer qu.Catch() <-InitAllLuaOver //脚本加载完毕,执行 if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true GetListDataDownloadDetail() } } func GetListDataDownloadDetail() { defer qu.Catch() logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++") Allspiders2.Range(func(k, v interface{}) bool { sp := v.(*Spider) go sp.DownloadHighDetail(true) time.Sleep(200 * time.Millisecond) return true }) } // 高性能模式根据列表页数据下载三级页 func (s *Spider) DownloadHighDetail(reload bool) { defer qu.Catch() for { logger.Info("Detail Running Code:", s.Code, " Stop:", s.Stop) if !s.Stop { //爬虫是运行状态 s.DownloadDetail(reload, false) } else { break } } } // 队列模式根据列表页数据下载三级页 func (s *Spider) DownloadListDetail(reload bool) { defer qu.Catch() s.DownloadDetail(reload, false) //队列模式爬虫下载完三级页数据或无下载数据,使用后close s.Stop = true if _, b := Allspiders2.Load(s.Code); b { Allspiders2.Store(s.Code, s) } s.L.Close() CC2 <- s.L } // 下载详情页 func (s *Spider) DownloadDetail(reload bool, isHistory bool) { defer qu.Catch() coll := "spider_highlistdata" isEsRepeat := false //是否进行es判重 q := map[string]interface{}{ "spidercode": s.Code, "state": 0, //0:入库状态;-1:采集失败;1:成功 } o := map[string]interface{}{"_id": -1} if !isHistory { //非历史数据下载,补充comeintime时间检索条件 comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累 if delaySite := DelaySiteMap[s.Name]; delaySite != nil { isEsRepeat = delaySite.Compete if delaySite.DelayTime <= util.Config.DayNum*24 { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay小时采集(由于7410、7500、7700为顺序采集,无法延时) //comeintimeQuery["$lte"] = GetTime(-delayDay + 1) comeintimeQuery["$lte"] = time.Now().Unix() - int64(3600*delaySite.DelayTime) } } q["comeintime"] = comeintimeQuery } else { coll = "spider_historydata" o["_id"] = 1 //历史数据正序 } f := map[string]interface{}{ "state": 0, "comeintime": 0, "event": 0, } if !isHistory && !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail", false) //记录modal=1采集三级页心跳 } countNum := MgoS.Count(coll, q) //统计util.Config.DayNum天内未下载爬虫个数 if isHistory && countNum == 0 { //下载历史数据量为0,手动stop s.Stop = true return } //logger.Info("Thread Info: Code:", s.SCode, " count:", countNum) if countNum > 0 { threadNum := countNum / util.Config.ThreadBaseNum //线程数 if threadNum > util.Config.ThreadUpperLimit { //设置单个爬虫线程上限 threadNum = util.Config.ThreadUpperLimit } logger.Info("Thread Info: Code:", s.SCode, " count:", countNum, " thread num:", threadNum) list, _ := MgoS.Find(coll, q, o, f, false, 0, 200) if list != nil && len(*list) > 0 { spChan := make(chan *Spider, threadNum+1) //初始化线程通道(+1表示基本的线程数) if threadNum > 1 { //初始化多个sp if !isHistory { //从LoopListPath取爬虫信息,是为了保证创建的spider对象始终使用的是最新的爬虫信息(爬虫上架后会更新LoopListPath中的爬虫信息) if v, ok := LoopListPath.Load(s.Code); ok && v != nil { if info, ok := v.(map[string]string); ok { NewSpiderByScript(threadNum, s.Code, info["script"], spChan) } else { logger.Debug("LoopListPath Not Has Code:", s.Code) spChan = make(chan *Spider, 1) //不能创建其它sp只能用主线程的sp } } else { logger.Debug("LoopListPath Not Has Code:", s.Code) spChan = make(chan *Spider, 1) //不能创建其它sp只能用主线程的sp } } else { NewSpiderByScript(threadNum, s.Code, s.ScriptFile, spChan) } } spChan <- s //主线程sp放入通道 wg := &sync.WaitGroup{} spLock := &sync.Mutex{} updateArr := [][]map[string]interface{}{} for _, tmp := range *list { spTmp := <-spChan //通道中取出sp对象 wg.Add(1) atomic.AddInt64(&AllThreadNum, 1) go func(tmp map[string]interface{}, sp *Spider) { defer func() { spChan <- sp //处理完数据sp对象放回通道中 wg.Done() atomic.AddInt64(&AllThreadNum, -1) }() if s.Stop || sp == nil { //爬虫下架或者初始化sp为nil时不再下载数据 return } _id := tmp["_id"] query := map[string]interface{}{"_id": _id} href := qu.ObjToString(tmp["href"]) //hashHref := util.HexText(href) update := []map[string]interface{}{} if isEsRepeat { //es数据title判重 title := qu.ObjToString(tmp["title"]) eTime := time.Now().Unix() sTime := eTime - int64(7*86400) //esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}` esQuery := elc.NewBoolQuery().Must(elc.NewRangeQuery("comeintime").Gte(sTime).Lte(eTime)).Must(elc.NewTermQuery("title.mtitle", title)) count := Es.Count(EsIndex, EsType, esQuery) if count > 0 { //es中含本title数据,不再采集,更新list表数据状态 util.AddBloomRedis("href", href) set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "es", "updatetime": time.Now().Unix()}} //已存在state置为1 update = append(update, query) update = append(update, set) spLock.Lock() updateArr = append(updateArr, update) spLock.Unlock() return } } times := qu.IntAll(tmp["times"]) //获取下载次数 success := true //数据是否下载成功的标志 delete(tmp, "_id") delete(tmp, "times") data := map[string]interface{}{} var err interface{} for k, v := range tmp { data[k] = v } //下载、解析、入库 data, err = sp.DownloadDetailPage(tmp, data) if !isHistory && !sp.Stop && sp.IsMainThread { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute", false) //记录modal=1下载数据心跳 } if err != nil || data == nil { success = false times++ if err != nil { logger.Error(s.Code, err, tmp) //if len(tmp) > 0 && !isHistory { //下载历史数据时不保存错误信息 // SaveErrorData(s.MUserName, tmp, err) //保存错误信息 //} } /*else if data == nil && times >= 3 { //下载问题,建editor任务 DownloadErrorData(s.Code, tmp) }*/ } /*else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同 util.RedisClusterSet(hashHref, "", -1) }*/ if !success { //下载失败更新次数和状态 ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()} if times >= 3 { //3次下载失败今天不再下载,state置为1 ss["state"] = -1 } set := map[string]interface{}{"$set": ss} update = append(update, query) update = append(update, set) spLock.Lock() updateArr = append(updateArr, update) spLock.Unlock() return } else if data["delete"] != nil { //三级页过滤 //util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到 //更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "delete", "updatetime": time.Now().Unix()}} update = append(update, query) update = append(update, set) spLock.Lock() updateArr = append(updateArr, update) spLock.Unlock() return } //正文、附件分析,下载异常数据重新下载 if r := AnalysisProjectInfo(data); r != "" { //顺序采集暂不加此块判断(异常数据不会加redis,导致一直下载) times++ ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()} if times >= 3 { //3次下载失败今天不再下载,state置为-1 ss["state"] = -1 ss["detailfilerr"] = r } set := map[string]interface{}{"$set": ss} update = append(update, query) update = append(update, set) spLock.Lock() updateArr = append(updateArr, update) spLock.Unlock() return } //数据采集成功 //根据发布时间进行数据判重校验 tmphref := qu.ObjToString(data["href"]) publishtime := qu.Int64All(data["l_np_publishtime"]) if publishtime < time.Now().AddDate(-1, 0, 0).Unix() { isExist, _ := util.ExistsBloomRedis("href", tmphref) if isExist { set := map[string]interface{}{"$set": map[string]interface{}{ "state": 1, "updatetime": time.Now().Unix(), "exist": "bloom_href", "tmphref": tmphref, }} update = append(update, query) update = append(update, set) spLock.Lock() updateArr = append(updateArr, update) spLock.Unlock() return } } delete(data, "exit") delete(data, "checkpublishtime") data["comeintime"] = time.Now().Unix() data["spidercode"] = s.Code data["dataging"] = 0 data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断) data["infoformat"] = s.Infoformat //爬虫类型 Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true) set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}} //下载成功state置为1 update = append(update, query) update = append(update, set) spLock.Lock() updateArr = append(updateArr, update) spLock.Unlock() //到此数据下载完成 }(tmp, spTmp) } wg.Wait() //更新数据 if len(updateArr) > 0 { MgoS.UpdateBulk(coll, updateArr...) updateArr = [][]map[string]interface{}{} } close(spChan) //关闭通道 //释放sp对象(保留主线程sp,IsMainThread=true) for sp := range spChan { if sp != nil && !sp.IsMainThread { sp.L.Close() } } if !s.Stop && reload { //高性能模式下载完三级页数据,sp对象需要重载 //重载主线程sp s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false) } } } else if reload { //高性能模式无数据sleep time.Sleep(30 * time.Second) } } // 初始化sp对象 func NewSpiderByScript(num int, code, script string, spChan chan *Spider) { for i := 1; i <= num; i++ { spTmp, errstr := CreateSpider(code, script, true, true) if errstr == "" && spTmp != nil { //脚本加载成功 spChan <- spTmp } else { spChan <- nil } } } // detail含“详情请访问原网页!”且附件未下成功的,不计入下载成功 func AnalysisProjectInfo(data map[string]interface{}) string { defer qu.Catch() detail := qu.ObjToString(data["detail"]) if RestrictAccessReg.MatchString(detail) { //限制访问 return "ip" } if detail == "详情请访问原网页!" || detail == "
详情请访问原网页!" { //不判断包含关系因为有些数据为json拼接,字段不全,会加“详情请访问原网页” if projectinfo, ok := data["projectinfo"].(map[string]interface{}); ok && len(projectinfo) > 0 { if attachments, ok := projectinfo["attachments"].(map[string]interface{}); ok && len(attachments) > 0 { for _, data := range attachments { if d, ok := data.(map[string]interface{}); ok { fid := qu.ObjToString(d["fid"]) if fid != "" { //附件上传成功 return "" } } } return "detail_file" } else { return "detail_file" } } else { return "detail_file" } } return "" } // 打印线程数 func AllThreadLog() { logger.Info("List Download All Thread:", ListAllThreadNum) logger.Info("Detail Download All Thread:", AllThreadNum) time.AfterFunc(1*time.Minute, AllThreadLog) } // 获取hascode func GetHas1(data string) string { t := sha1.New() io.WriteString(t, data) hf := Reg.FindString(data) if !strings.HasSuffix(hf, "/") { hf = hf + "/" } return hf + fmt.Sprintf("%x", t.Sum(nil)) }