12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796 |
- /*
- *
- 爬虫,脚本接口,需要扩展
- */
- package spider
- import (
- "crypto/sha1"
- "fmt"
- elc "github.com/olivere/elastic/v7"
- "io"
- "log"
- mgo "mongodb"
- qu "qfw/util"
- es "qfw/util/elastic.v7"
- "sort"
- "strconv"
- "strings"
- "sync"
- "regexp"
- util "spiderutil"
- "sync/atomic"
- "time"
- "github.com/donnie4w/go-logger/logger"
- "github.com/yuin/gopher-lua"
- )
- // Heart 心跳
- type Heart struct {
- DetailHeart int64 //爬虫三级页执行心跳
- DetailExecuteHeart int64 //三级页采集到数据心跳
- FindListHeart int64 //findListHtml执行心跳
- ListHeart int64 //爬虫列表页执行心跳
- FirstPageHeart int64 //采集第一页的心跳
- ModifyUser string //爬虫维护人
- Site string //站点
- Channel string //栏目
- }
- // SpiderFlow 流量
- type SpiderFlow struct {
- Flow int64 //流量
- ModifyUser string //爬虫维护人
- Site string //站点
- Channel string //栏目
- //Code string
- }
- // Spider 爬虫
- type Spider struct {
- Script
- Code string //代码
- Name string //名称
- Channel string //站点
- DownDetail bool //是否下载详细页
- Stop bool //停止标志
- Pass bool //暂停标志
- LastPubshTime int64 //最后发布时间
- LastHeartbeat int64 //最后心跳时间
- SpiderRunRate int64 //执行频率
- ExecuteOkTime int64 //任务执行成功/完成时间
- Collection string //写入表名
- Thread int64 //线程数
- LastExecTime int64 //最后执行时间
- LastDowncount int32 //最后一次下载量
- TodayDowncount int32 //今日下载量
- YesterdayDowncount int32 //昨日下载量
- TotalDowncount int32 //总下载量
- RoundCount int32 //执行轮次
- StoreMode int //存储模式
- StoreToMsgEvent int //消息类型
- CoverAttr string //按属性判重数据
- SleepBase int //基本延时
- SleepRand int //随机延时
- TargetChannelUrl string //栏目页地址
- UpperLimit, LowerLimit int //正常值上限、下限
- //UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间
- MUserName, MUserEmail string //维护人,维护人邮箱
- //Index int //数组索引
- //历史补漏
- IsHistoricalMend bool //是否是历史补漏爬虫
- IsMustDownload bool //是否强制下载
- IsCompete bool //区分新老爬虫
- Infoformat int //区分爬虫类型 1:招标;2:拟建/审批;3:产权
- IsMainThread bool //是否为主线程(多线程采集时区分是否为主线程)
- ListParallelTaskNum int //列表页爬虫执行任务并行数量
- }
- var (
- Es *es.Elastic
- EsIndex string
- EsType string
- MgoS *mgo.MongodbSim
- MgoEB *mgo.MongodbSim
- TimeChan = make(chan bool, 1)
- Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
- RestrictAccessReg = regexp.MustCompile(`访问被拒绝`)
- AllThreadNum int64
- ListAllThreadNum int64
- DelaySiteMap map[string]*DelaySite //延迟采集站点集合
- UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息
- SPH = make(chan bool, 5)
- DataBakSaveCache = make(chan map[string]interface{}, 1000) //保存采集信息详情页记录
- DB_CH = make(chan bool, 5)
- )
- type DelaySite struct {
- DelayTime int
- Compete bool
- }
- // 任务
- func (s *Spider) StartJob() {
- s.Stop = false
- s.Pass = false
- s.RoundCount++
- go s.ExecJob(false)
- }
- // 单次执行
- func (s *Spider) ExecJob(reload bool) {
- defer func() {
- s.ExecuteOkTime = time.Now().Unix()
- util.TimeSleepFunc(5*time.Second, TimeSleepChan)
- if util.Config.Working == 1 {
- s.Stop = true
- if _, b := Allspiders.Load(s.Code); b {
- Allspiders.Store(s.Code, s)
- }
- //资源释放(队列模式下架操作时,不操作资源释放,执行完后自动释放)
- s.L.Close()
- CC <- s.L
- }
- }()
- //高效模式,轮询调度时重载脚本
- if reload && util.Config.Working == 0 {
- s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
- }
- logger.Debug(s.Code, s.Name, "频率:", s.SpiderRunRate, ",", s.Timeout)
- s.LastDowncount = 0
- s.LastExecTime = time.Now().Unix()
- s.LastHeartbeat = time.Now().Unix()
- s.ExecuteOkTime = 0
- //err := s.GetLastPublishTime() //获取最新时间--作为最后更新时间
- //if err != nil {
- // logger.Error(s.Code, err)
- //}
- //判断是否使用高并发下载三级页
- var err interface{}
- if Supplement {
- err = s.SupplementDownListPageItem() //增量补采数据,下载列表
- } else if util.Config.PageTurnInfo.ListThreadsNum > 1 {
- err = s.DownListPageItemByThreads() //并发下载列表
- } else {
- err = s.DownListPageItem() //下载列表
- }
- //if util.Config.Working == 0 && util.Config.Modal == 1 && !util.Config.IsHistoryEvent {
- // err = s.DownListPageItemByThreads() //下载列表
- //} else {
- // err = s.DownListPageItem() //下载列表
- //}
- if err != nil {
- logger.Error(s.Code, err)
- }
- if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //判断爬虫是增量还是历史爬虫(目前只会在7000节点上有历史爬虫)
- UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
- SpiderCodeSendToEditor(s.Code) //历史转增量爬虫发送编辑器,切换节点上下架
- return
- } else {
- if util.Config.Working == 0 && !Supplement { //高性能模式
- /*
- for !s.Stop && s.Pass {
- util.TimeSleepFunc(2*time.Second, TimeSleepChan)
- }
- if s.Stop {
- return
- }
- */
- //if s.IsMustDownload { //历史数据下载,只跑一轮
- if s.IsHistoricalMend && util.Config.IsHistoryEvent { //历史节点7000,高性能模式,历史补漏只下载一轮
- UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
- b := MgoEB.Update("luaconfig", map[string]interface{}{"code": s.Code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false)
- logger.Info("Delete History Code:", s.Code, b)
- } else {
- if !s.Stop { //未下架定时执行
- util.TimeAfterFunc(time.Duration(s.SpiderRunRate)*time.Minute, func() {
- s.ExecJob(true)
- }, TimeChan)
- } else { //下架后子线程退出
- return
- }
- }
- } else { //排队模式或者数据补采
- return
- }
- }
- }
- // 获取最新时间--作为最后更新时间
- func (s *Spider) GetLastPublishTime() (errs interface{}) {
- defer qu.Catch()
- var lastpublishtime string
- //取得最后更新时间
- if err := s.L.CallByParam(lua.P{
- Fn: s.L.GetGlobal("getLastPublishTime"),
- NRet: 1,
- Protect: true,
- }); err != nil {
- //panic(s.Code + "," + err.Error())
- log.Println(s.Code + "," + err.Error())
- errs = err.Error()
- atomic.AddInt32(&s.Script.ErrorNum, 1)
- return errs
- }
- ret := s.L.Get(-1)
- s.L.Pop(1)
- if str, ok := ret.(lua.LString); ok {
- lastpublishtime = string(str)
- }
- if s.LastPubshTime < util.ParseDate2Int64(lastpublishtime) {
- //防止发布时间超前
- if util.ParseDate2Int64(lastpublishtime) > time.Now().Unix() {
- s.LastPubshTime = time.Now().Unix()
- } else {
- s.LastPubshTime = util.ParseDate2Int64(lastpublishtime)
- }
- }
- return nil
- }
- // 下载列表(较DownListPageItemBack去掉了无数据的重试和重复页记录)
- func (s *Spider) DownListPageItem() (errs interface{}) {
- defer qu.Catch()
- start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页
- s.MaxPage = max //
- repeatAllNum := 0 //本轮采集tmpMax页总的重复个数
- downloadAllNum := 0 //本轮采集tmpMax页总个数
- if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点,爬虫跑历史
- max = s.GetIntVar("spiderHistoryMaxPage")
- }
- downtimes := 0 //记录某页重试次数(暂定3次)
- repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页)
- isRunRepeatList := false //是否执行列表页连续判重
- if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重
- isRunRepeatList = true
- max = util.Config.PageTurnInfo.TurnPageMaxLimit //高性能模式设置最大页为100,队列模式50页
- }
- //子任务判断
- if s.ContinueDownListChildTask {
- start = util.Config.PageTurnInfo.TurnPageMaxLimit + 1 //子任务起始页
- max = util.Config.PageTurnInfo.TurnPageMaxLimit + util.Config.PageTurnInfo.NextPageMaxLimit //子任务最大页
- }
- for ; start <= max && !s.Stop; start++ {
- if !s.Stop && !s.ContinueDownListChildTask { //在下载详情页时爬虫下架,此时不再存心跳信息
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list", start == 1) //记录所有节点列表页心跳
- }
- //qu.Debug("爬虫:", s.Code, " 配置最大页:", s.MaxPage, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes)
- if isRunRepeatList && repeatPageTimes >= util.Config.PageTurnInfo.RepeatPageTimesLimit { //超过连续判重上限,不再翻页
- break
- }
- if err := s.L.CallByParam(lua.P{
- Fn: s.L.GetGlobal("downloadAndParseListPage"),
- NRet: 1,
- Protect: true,
- }, lua.LNumber(start)); err != nil {
- //panic(s.Code + "," + err.Error())
- logger.Error("列表页采集报错", start, s.Code+","+err.Error())
- errs = err.Error()
- atomic.AddInt32(&s.Script.ErrorNum, 1)
- //列表页采集报错进行重试,超过重试次数视为该页已采(脚本报错直接退出不进行内部重试)
- if downtimes < 3 {
- downtimes++
- start--
- } else if isRunRepeatList { //超过重试次数,视为本页重复
- repeatPageTimes++ //次数加1
- downtimes = 0
- }
- continue
- }
- lv := s.L.Get(-1)
- s.L.Pop(1)
- if tbl, ok := lv.(*lua.LTable); ok {
- //list := []map[string]interface{}{}
- //qu.Debug("当前页数据量:", tbl.Len())
- if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页
- repeatListNum := 0 // 当前列表页连接重复个数
- for i := 1; i <= tabLen; i++ {
- v := tbl.RawGetInt(i).(*lua.LTable)
- tmp := util.TableToMap(v)
- if !s.IsHistoricalMend { //不是历史补漏
- tmp["dataging"] = 0 //数据中打标记dataging=0
- if s.DownDetail {
- s.DownloadDetailItem(tmp, &repeatListNum)
- }
- } else { //历史补漏
- s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页
- }
- }
- repeatAllNum += repeatListNum
- downloadAllNum += tabLen
- if isRunRepeatList { //执行连续页码判重
- if repeatListNum >= tabLen { //当前start列表页全部数据都已采集
- repeatPageTimes++ //次数加1
- } else { //当前start页有新数据,重复次数重置
- repeatPageTimes = 0
- }
- }
- } else if isRunRepeatList {
- repeatPageTimes++ //次数加1
- }
- } else if isRunRepeatList {
- repeatPageTimes++ //次数加1
- }
- downtimes = 0 //当前页下载无误,重置下载重试次数
- util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
- }
- logger.Info(s.Code, "本轮列表页采集详情:", s.ContinueDownListChildTask, downloadAllNum, repeatAllNum, start, s.Stop)
- if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率
- nowTime := time.Now()
- sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
- set := map[string]interface{}{
- "site": s.Name,
- "channel": s.Channel,
- "spidercode": s.Code,
- "updatetime": nowTime.Unix(),
- "event": util.Config.Uploadevent,
- "modifyuser": s.MUserName,
- "maxpage": s.MaxPage,
- "runrate": s.SpiderRunRate,
- "endpage": start,
- "date": sDate,
- }
- inc := map[string]interface{}{
- "alltimes": 1,
- }
- //记录翻页是否成功
- if s.MaxPage > 1 {
- if s.PageOneTextHash != "" {
- if s.PageTwoTextHash != "" {
- if s.PageOneTextHash != s.PageTwoTextHash {
- inc["page_success"] = 1
- } else {
- inc["page_fail"] = 1
- }
- } else {
- inc["page_fail"] = 1
- }
- } else if s.PageTwoTextHash != "" {
- inc["page_onefail"] = 1
- }
- }
- if downloadAllNum > 0 {
- rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum)
- rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64)
- if rate == 1.0 {
- if downloadAllNum == 1 { //列表页数据过滤的只剩一条新数据
- inc["oh_percent_onenum"] = 1
- } else {
- inc["oh_percent"] = 1
- }
- //} else if rate >= 0.9 {
- // inc["nt_percent"] = 1
- //} else if rate >= 0.8 {
- // inc["et_percent"] = 1
- //} else {
- // inc["other_percent"] = 1
- }
- if isRunRepeatList && start > max { //连续翻页超过了上限
- if !s.ContinueDownListChildTask {
- go ContinueDownListPageItem(s) //开启子任务继续采集
- } else {
- inc["uplimit"] = 1
- }
- }
- } else {
- inc["zero"] = 1
- }
- query := map[string]interface{}{
- "date": sDate,
- "spidercode": s.Code,
- }
- MgoS.Update("spider_downloadrate", query, map[string]interface{}{
- "$set": set,
- "$inc": inc,
- }, true, false)
- }
- //信息重置
- s.PageOneTextHash = ""
- s.PageTwoTextHash = ""
- return errs
- }
- func (s *Spider) DownListPageItemBack() (errs interface{}) {
- defer qu.Catch()
- start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页
- s.MaxPage = max //
- //tmpMax := max //临时记录最大页
- repeatAllNum := 0 //本轮采集tmpMax页总的重复个数
- downloadAllNum := 0 //本轮采集tmpMax页总个数
- if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点,爬虫跑历史
- max = s.GetIntVar("spiderHistoryMaxPage")
- }
- downtimes := 0 //记录某页重试次数(暂定3次)
- repeatPageNum := 0 //记录列表页所有连接重复的页码
- repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页)
- isRunRepeatList := false //是否执行列表页连续判重
- if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重
- isRunRepeatList = true
- max = util.Config.PageTurnInfo.TurnPageMaxLimit //高性能模式设置最大页为100,队列模式50页
- }
- //子任务判断
- if s.ContinueDownListChildTask {
- start = util.Config.PageTurnInfo.TurnPageMaxLimit + 1
- max = util.Config.PageTurnInfo.TurnPageMaxLimit + util.Config.PageTurnInfo.NextPageMaxLimit
- }
- for ; start <= max && !s.Stop; start++ {
- if !s.Stop && !s.ContinueDownListChildTask { //在下载详情页时爬虫下架,此时不再存心跳信息
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list", start == 1) //记录所有节点列表页心跳
- }
- //qu.Debug("爬虫:", s.Code, "重复页:", repeatPageNum, " 配置最大页:", s.MaxPage, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes)
- //if start > tmpMax && isRunRepeatList && repeatPageTimes >= 5 { //重复次数超过5次,不再翻页
- // break
- //}
- if isRunRepeatList && repeatPageTimes >= util.Config.PageTurnInfo.RepeatPageTimesLimit { //超过连续判重上限,不再翻页
- break
- }
- if err := s.L.CallByParam(lua.P{
- Fn: s.L.GetGlobal("downloadAndParseListPage"),
- NRet: 1,
- Protect: true,
- }, lua.LNumber(start)); err != nil {
- //panic(s.Code + "," + err.Error())
- logger.Error("列表页采集报错", start, s.Code+","+err.Error())
- errs = err.Error()
- atomic.AddInt32(&s.Script.ErrorNum, 1)
- //列表页采集报错进行重试,超过重试次数视为该页已采
- if downtimes < 2 {
- downtimes++
- start--
- } else if isRunRepeatList { //超过重试次数,视为本页重复
- if repeatPageNum+1 == start {
- repeatPageTimes++ //次数加1
- } else {
- repeatPageTimes = 0 //重复次数重置0
- }
- repeatPageNum = start //赋值页码
- downtimes = 0
- }
- continue
- }
- lv := s.L.Get(-1)
- s.L.Pop(1)
- if tbl, ok := lv.(*lua.LTable); ok {
- //list := []map[string]interface{}{}
- //qu.Debug("当前页数据量:", tbl.Len())
- if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页
- repeatListNum := 0 // 当前列表页连接重复个数
- for i := 1; i <= tabLen; i++ {
- v := tbl.RawGetInt(i).(*lua.LTable)
- tmp := util.TableToMap(v)
- if !s.IsHistoricalMend { //不是历史补漏
- tmp["dataging"] = 0 //数据中打标记dataging=0
- if s.DownDetail {
- s.DownloadDetailItem(tmp, &repeatListNum)
- } /*else {//暂无此类爬虫
- tmp["comeintime"] = time.Now().Unix()
- //atomic.AddInt32(&s.LastDowncount, 1)
- //atomic.AddInt32(&s.TodayDowncount, 1)
- //atomic.AddInt32(&s.TotalDowncount, 1)
- href := fmt.Sprint(tmp["href"])
- if len(href) > 5 { //有效数据
- hashHref := util.HexText(href)
- util.RedisClusterSet(hashHref, "", -1) //全量redis
- list = append(list, tmp)
- }
- }*/
- } else { //历史补漏
- s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页
- }
- }
- repeatAllNum += repeatListNum
- downloadAllNum += tabLen
- if isRunRepeatList { //执行连续页码判重
- if repeatListNum >= tabLen { //当前start列表页全部数据都已采集
- //qu.Debug("重复页:", repeatPageNum, "当前页:", start)
- if repeatPageNum+1 == start || repeatPageNum == 0 {
- repeatPageTimes++ //次数加1
- } else {
- repeatPageTimes = 0 //重复次数重置0
- }
- repeatPageNum = start //赋值页码
- } else { //当前start页有遗漏数据
- repeatPageTimes = 0
- repeatPageNum = 0
- }
- }
- //if !s.IsHistoricalMend && !s.DownDetail {
- // if len(list) > 0 { //保存信息入库
- // StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list)
- // }
- //}
- } else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页(存在列表页数据被全部过滤的情况)
- if downtimes < 2 {
- downtimes++
- start--
- continue
- } else if isRunRepeatList { //超过重试次数,视为本页重复
- if repeatPageNum+1 == start {
- repeatPageTimes++ //次数加1
- } else {
- repeatPageTimes = 0 //重复次数重置0
- }
- repeatPageNum = start //赋值页码
- }
- }
- } else { //请求当前列表页失败
- if downtimes < 2 {
- downtimes++
- start--
- continue
- } else if isRunRepeatList { //超过重试次数,视为本页重复
- if repeatPageNum+1 == start {
- repeatPageTimes++ //次数加1
- } else {
- repeatPageTimes = 0 //重复次数重置0
- }
- repeatPageNum = start //赋值页码
- }
- }
- downtimes = 0 //当前页下载无误,重置下载重试次数
- util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
- }
- logger.Info(s.Code, "本轮列表页采集详情:", s.ContinueDownListChildTask, downloadAllNum, repeatAllNum, start, s.Stop)
- if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率
- nowTime := time.Now()
- sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
- set := map[string]interface{}{
- "site": s.Name,
- "channel": s.Channel,
- "spidercode": s.Code,
- "updatetime": nowTime.Unix(),
- "event": util.Config.Uploadevent,
- "modifyuser": s.MUserName,
- "maxpage": s.MaxPage,
- "runrate": s.SpiderRunRate,
- "endpage": start,
- "date": sDate,
- }
- inc := map[string]interface{}{
- "alltimes": 1,
- }
- //记录翻页是否成功
- if s.MaxPage > 1 {
- if s.PageOneTextHash != "" {
- if s.PageTwoTextHash != "" {
- if s.PageOneTextHash != s.PageTwoTextHash {
- inc["page_success"] = 1
- } else {
- inc["page_fail"] = 1
- }
- } else {
- inc["page_fail"] = 1
- }
- } else if s.PageTwoTextHash != "" {
- inc["page_onefail"] = 1
- }
- }
- if downloadAllNum > 0 {
- rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum)
- rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64)
- if rate == 1.0 {
- if downloadAllNum == 1 { //列表页数据过滤的只剩一条新数据
- inc["oh_percent_onenum"] = 1
- } else {
- inc["oh_percent"] = 1
- }
- //} else if rate >= 0.9 {
- // inc["nt_percent"] = 1
- //} else if rate >= 0.8 {
- // inc["et_percent"] = 1
- //} else {
- // inc["other_percent"] = 1
- }
- if isRunRepeatList && start > max { //连续翻页超过了上限
- if !s.ContinueDownListChildTask {
- go ContinueDownListPageItem(s) //开启子任务继续采集
- } else {
- inc["uplimit"] = 1
- }
- }
- } else {
- inc["zero"] = 1
- }
- query := map[string]interface{}{
- "date": sDate,
- "spidercode": s.Code,
- }
- MgoS.Update("spider_downloadrate", query, map[string]interface{}{
- "$set": set,
- "$inc": inc,
- }, true, false)
- }
- //信息重置
- s.PageOneTextHash = ""
- s.PageTwoTextHash = ""
- return errs
- }
- // 并发下载列表
- func (s *Spider) DownListPageItemByThreads() (errs interface{}) {
- defer qu.Catch()
- start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页
- s.MaxPage = max //记录爬虫配置的最大页
- repeatAllNum := int64(0) //本轮采集总的重复个数
- downloadAllNum := int64(0) //本轮采集总个数
- if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点根据爬虫类型,取采集的最大页
- max = s.GetIntVar("spiderHistoryMaxPage") //采集历史的爬虫,取历史最大页配置spiderHistoryMaxPage
- }
- isRunRepeatList := false //是否执行列表页连续判重逻辑
- //是否进行连续翻页判断,修改最大页
- if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重
- isRunRepeatList = true
- max = util.Config.PageTurnInfo.TurnPageMaxLimit //高性能模式设置最大页为100,队列模式50页
- }
- //子任务判断
- //if s.ContinueDownListChildTask {
- // start = util.Config.PageTurnInfo.TurnPageMaxLimit + 1
- // max = util.Config.PageTurnInfo.TurnPageMaxLimit + util.Config.PageTurnInfo.NextPageMaxLimit
- //}
- //创建并发Spider对象
- spChan := make(chan *Spider, 1)
- if isRunRepeatList && util.Config.PageTurnInfo.ListThreadsNum > 1 { //无限翻页模式设置spChan并发大小
- spChan = make(chan *Spider, util.Config.PageTurnInfo.ListThreadsNum)
- spChan <- s //加入通道
- NewSpiderByScript(util.Config.PageTurnInfo.ListThreadsNum-1, s.Code, s.ScriptFile, spChan) //创建多个Spider对象
- } else {
- spChan <- s //加入通道
- }
- endPage := 0 //结束页
- repeatTimes := 0 //连续判重次数
- for ; start <= max && !s.Stop; start += util.Config.PageTurnInfo.ListThreadsNum {
- if !s.Stop && !s.ContinueDownListChildTask { //在下载详情页时爬虫下架,此时不再存心跳信息
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list", start == 1) //记录所有节点列表页心跳
- }
- listWg := &sync.WaitGroup{}
- pageMap := map[int]bool{} //记录某页数据是否都已采集
- pageArr := []int{}
- lock := &sync.Mutex{}
- //并发下载列表页
- for listThreadNum := 0; listThreadNum < util.Config.PageTurnInfo.ListThreadsNum; listThreadNum++ {
- pagenum := start + listThreadNum //当前实际采集页码
- if pagenum > max { //并发采集时,每次开启repeatPageTimesLimit个并发,并发有可能超过max上限
- break
- }
- spTmp := <-spChan //通道中取出sp对象
- listWg.Add(1)
- atomic.AddInt64(&ListAllThreadNum, 1)
- endPage = pagenum + 1
- go func(sp *Spider, pagenum int) {
- defer func() {
- spChan <- sp //处理完数据sp对象放回通道中
- listWg.Done()
- atomic.AddInt64(&ListAllThreadNum, -1)
- }()
- //下载某一页数据
- downnum, repeatnum := sp.DownListOnePage(pagenum)
- //logger.Info(sp.Code, "pagenum", pagenum, "repeat", downnum == repeatnum, downnum, repeatnum, &sp)
- //汇总下载量
- atomic.AddInt64(&downloadAllNum, int64(downnum))
- atomic.AddInt64(&repeatAllNum, int64(repeatnum))
- lock.Lock()
- pageMap[pagenum] = downnum == repeatnum //当前pagenum页数据是否已采集
- pageArr = append(pageArr, pagenum)
- lock.Unlock()
- if downnum > 0 {
- //使用并发采集列表页时,spider对象不是同一个,只能采集后统计
- if pagenum == 1 { //将第一页sp采集信息的hash值赋值给s
- s.PageOneTextHash = sp.PageOneTextHash
- } else if pagenum == 2 { //将第二页sp采集信息的hash值赋值给s
- s.PageTwoTextHash = sp.PageTwoTextHash
- }
- }
- }(spTmp, pagenum)
- }
- listWg.Wait()
- if isRunRepeatList {
- sort.Ints(pageArr) //页码从小到大排序
- for _, page := range pageArr {
- if pageMap[page] { //
- repeatTimes++
- } else {
- repeatTimes = 0
- }
- }
- if repeatTimes >= util.Config.PageTurnInfo.RepeatPageTimesLimit { //超过连续判重上限,不再翻页
- break
- }
- }
- }
- //close(spChan) //关闭通道,释放资源
- logger.Info(s.Code, "本轮列表页采集详情:", s.ContinueDownListChildTask, downloadAllNum, repeatAllNum, endPage, s.Stop)
- if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率
- nowTime := time.Now()
- sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
- set := map[string]interface{}{
- "site": s.Name,
- "channel": s.Channel,
- "spidercode": s.Code,
- "updatetime": nowTime.Unix(),
- "event": util.Config.Uploadevent,
- "modifyuser": s.MUserName,
- "maxpage": s.MaxPage,
- "runrate": s.SpiderRunRate,
- "endpage": endPage,
- "date": sDate,
- }
- inc := map[string]interface{}{
- "alltimes": 1,
- }
- //记录翻页是否成功
- if s.MaxPage > 1 { //最大页为1的,用列表页是否异常体现爬虫运行情况
- if s.PageOneTextHash != "" {
- if s.PageTwoTextHash != "" {
- if s.PageOneTextHash != s.PageTwoTextHash {
- inc["page_success"] = 1
- } else {
- inc["page_fail"] = 1
- }
- } else {
- inc["page_fail"] = 1
- }
- } else if s.PageTwoTextHash != "" {
- inc["page_onefail"] = 1
- }
- }
- if downloadAllNum > 0 {
- rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum)
- rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64)
- if rate == 1.0 {
- if downloadAllNum == 1 { //列表页数据过滤的只剩一条新数据
- inc["oh_percent_onenum"] = 1
- } else {
- inc["oh_percent"] = 1
- }
- //} else if rate >= 0.9 {
- // inc["nt_percent"] = 1
- //} else if rate >= 0.8 {
- // inc["et_percent"] = 1
- //} else {
- // inc["other_percent"] = 1
- }
- if isRunRepeatList && endPage > max { //连续翻页超过了上限
- if !s.ContinueDownListChildTask {
- go ContinueDownListPageItem(s) //开启子任务继续采集
- } else {
- inc["uplimit"] = 1
- }
- }
- } else {
- inc["zero"] = 1
- }
- query := map[string]interface{}{
- "date": sDate,
- "spidercode": s.Code,
- }
- MgoS.Update("spider_downloadrate", query, map[string]interface{}{
- "$set": set,
- "$inc": inc,
- }, true, false)
- }
- //信息重置
- s.PageOneTextHash = ""
- s.PageTwoTextHash = ""
- return errs
- }
- // 并发下载列表
- func (s *Spider) DownListPageItemByThreadsBack() (errs interface{}) {
- defer qu.Catch()
- start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页
- s.MaxPage = max //记录爬虫配置的最大页
- repeatAllNum := int64(0) //本轮采集总的重复个数
- downloadAllNum := int64(0) //本轮采集总个数
- if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点根据爬虫类型,取采集的最大页
- max = s.GetIntVar("spiderHistoryMaxPage") //采集历史的爬虫,取历史最大页配置spiderHistoryMaxPage
- }
- repeatPageTimesLimit := 1 //记录页码连续判重的次数上限(默认1:for循环翻页时等效,至少+1)
- isRunRepeatList := false //是否执行列表页连续判重逻辑
- //是否进行连续翻页判断,修改最大页
- if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重
- isRunRepeatList = true
- repeatPageTimesLimit = util.Config.PageTurnInfo.RepeatPageTimesLimit
- max = util.Config.PageTurnInfo.TurnPageMaxLimit //高性能模式设置最大页为100,队列模式50页
- }
- //子任务判断
- if s.ContinueDownListChildTask {
- start = util.Config.PageTurnInfo.TurnPageMaxLimit + 1
- max = util.Config.PageTurnInfo.TurnPageMaxLimit + util.Config.PageTurnInfo.NextPageMaxLimit
- }
- //创建并发Spider对象
- spChan := make(chan *Spider, 1)
- if isRunRepeatList { //无限翻页模式设置spChan并发大小
- spChan = make(chan *Spider, repeatPageTimesLimit) //并发数量由连续翻页判重数量决定
- spChan <- s //加入通道
- NewSpiderByScript(repeatPageTimesLimit-1, s.Code, s.ScriptFile, spChan) //创建多个Spider对象
- } else {
- spChan <- s //加入通道
- }
- endPage := 0 //结束页
- for ; start <= max && !s.Stop; start += repeatPageTimesLimit {
- if !s.Stop && !s.ContinueDownListChildTask { //在下载详情页时爬虫下架,此时不再存心跳信息
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list", start == 1) //记录所有节点列表页心跳
- }
- listWg := &sync.WaitGroup{}
- isContinue := false //是否继续采集
- //并发下载列表页
- for listThreadNum := 0; listThreadNum < repeatPageTimesLimit; listThreadNum++ {
- pagenum := start + listThreadNum //当前实际采集页码
- if pagenum > max { //并发采集时,每次开启repeatPageTimesLimit个并发,并发有可能超过max上限
- break
- }
- spTmp := <-spChan //通道中取出sp对象
- listWg.Add(1)
- atomic.AddInt64(&ListAllThreadNum, 1)
- endPage = pagenum + 1
- go func(sp *Spider, pagenum int) {
- defer func() {
- spChan <- sp //处理完数据sp对象放回通道中
- listWg.Done()
- atomic.AddInt64(&ListAllThreadNum, -1)
- }()
- //下载某一页数据
- downnum, repeatnum := sp.DownListOnePage(pagenum)
- //汇总下载量
- atomic.AddInt64(&downloadAllNum, int64(downnum))
- atomic.AddInt64(&repeatAllNum, int64(repeatnum))
- if downnum > 0 {
- if downnum-repeatnum > 0 { //本页有新数据
- isContinue = true
- }
- //使用并发采集列表页时,spider对象不是同一个,只能采集后统计
- if pagenum == 1 { //将第一页sp采集信息的hash值赋值给s
- s.PageOneTextHash = sp.PageOneTextHash
- } else if pagenum == 2 { //将第二页sp采集信息的hash值赋值给s
- s.PageTwoTextHash = sp.PageTwoTextHash
- }
- }
- //qu.Debug("第", pagenum, "页采集信息:", downnum, repeatnum)
- }(spTmp, pagenum)
- }
- listWg.Wait()
- if !isContinue { //并发采集结果中,如果某页有新数据,继续采集,直到上限页
- break
- }
- }
- close(spChan) //关闭通道,释放资源
- logger.Info(s.Code, "本轮列表页采集详情:", s.ContinueDownListChildTask, downloadAllNum, repeatAllNum, endPage, s.Stop)
- if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率
- nowTime := time.Now()
- sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
- set := map[string]interface{}{
- "site": s.Name,
- "channel": s.Channel,
- "spidercode": s.Code,
- "updatetime": nowTime.Unix(),
- "event": util.Config.Uploadevent,
- "modifyuser": s.MUserName,
- "maxpage": s.MaxPage,
- "runrate": s.SpiderRunRate,
- "endpage": endPage,
- "date": sDate,
- }
- inc := map[string]interface{}{
- "alltimes": 1,
- }
- //记录翻页是否成功
- if s.MaxPage > 1 { //最大页为1的,用列表页是否异常体现爬虫运行情况
- if s.PageOneTextHash != "" {
- if s.PageTwoTextHash != "" {
- if s.PageOneTextHash != s.PageTwoTextHash {
- inc["page_success"] = 1
- } else {
- inc["page_fail"] = 1
- }
- } else {
- inc["page_fail"] = 1
- }
- } else if s.PageTwoTextHash != "" {
- inc["page_onefail"] = 1
- }
- }
- if downloadAllNum > 0 {
- rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum)
- rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64)
- if rate == 1.0 {
- if downloadAllNum == 1 { //列表页数据过滤的只剩一条新数据
- inc["oh_percent_onenum"] = 1
- } else {
- inc["oh_percent"] = 1
- }
- //} else if rate >= 0.9 {
- // inc["nt_percent"] = 1
- //} else if rate >= 0.8 {
- // inc["et_percent"] = 1
- //} else {
- // inc["other_percent"] = 1
- }
- if isRunRepeatList && endPage > max { //连续翻页超过了上限
- if !s.ContinueDownListChildTask {
- go ContinueDownListPageItem(s) //开启子任务继续采集
- } else {
- inc["uplimit"] = 1
- }
- }
- } else {
- inc["zero"] = 1
- }
- query := map[string]interface{}{
- "date": sDate,
- "spidercode": s.Code,
- }
- MgoS.Update("spider_downloadrate", query, map[string]interface{}{
- "$set": set,
- "$inc": inc,
- }, true, false)
- }
- //信息重置
- s.PageOneTextHash = ""
- s.PageTwoTextHash = ""
- return errs
- }
- // 补采下载列表
- func (s *Spider) SupplementDownListPageItem() (errs interface{}) {
- defer qu.Catch()
- var (
- errtimes int //采集异常次数(暂定10次)
- errPageNum int //当前采集异常页码
- downtimes int //记录某页重试次数(暂定3次)
- downloadAllNum int //记录本次采集,信息采集总量
- saveAllNum int //记录本次采集,信息补采总量
- repeatAllNum int //记录本次采集,信息重复总量
- pageTitleHash string //记录当前页所有title文本
- finishText = "正常退出" //
- start = 1 //起始页
- )
- for {
- if errtimes >= Supplement_MaxErrorTimes { //连续异常次数超过10次,爬虫不再翻页
- finishText = "异常退出"
- logger.Info(s.Code + "连续10页采集异常")
- break
- }
- if err := s.L.CallByParam(lua.P{
- Fn: s.L.GetGlobal("downloadAndParseListPage"),
- NRet: 1,
- Protect: true,
- }, lua.LNumber(start)); err != nil {
- //panic(s.Code + "," + err.Error())
- logger.Error("列表页采集报错", start, s.Code+","+err.Error())
- errs = err.Error()
- if downtimes < 3 {
- downtimes++
- } else if errtimes == 0 || start == errPageNum+1 {
- errtimes++
- errPageNum = start
- start++
- downtimes = 0
- }
- continue
- }
- lv := s.L.Get(-1)
- s.L.Pop(1)
- if tbl, ok := lv.(*lua.LTable); ok {
- if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页
- var (
- publishtimeErrTimes int
- text string
- repeatListNum int // 当前列表页连接重复个数
- num = 1
- isBreak = false
- )
- for ; num <= tabLen; num++ {
- v := tbl.RawGetInt(num).(*lua.LTable)
- tmp := util.TableToMap(v)
- tmp["dataging"] = 0 //数据中打标记dataging=0
- s.DownloadDetailItem(tmp, &repeatListNum)
- pTmp := qu.ObjToString(tmp["publishtime"])
- title := qu.ObjToString(tmp["title"])
- text += title
- pTime, _ := time.ParseInLocation(qu.Date_Full_Layout, pTmp, time.Local)
- publishtime := pTime.Unix()
- if publishtime > 1000000000 && publishtime < Supplement_Publishtime { //正常退出
- isBreak = true
- //break
- } else if publishtime <= 1000000000 { //异常发布时间
- publishtimeErrTimes++
- }
- }
- logger.Info(s.Code, start, tabLen, repeatListNum)
- downloadAllNum += tabLen //采集总量累计
- repeatAllNum += repeatListNum //重复总量累计
- saveAllNum += num - 1 - repeatListNum //保存总量累计
- tmpPageTitleHash := pageTitleHash //
- pageTitleHash = util.HexText(text) //
- if tabLen == publishtimeErrTimes || tmpPageTitleHash == pageTitleHash { //当前页数据发布时间均异常;当前页与上页采集内容一致
- //if errtimes == 0 || start == errPageNum+1 {
- errtimes++
- errPageNum = start
- start++
- //}
- continue
- } else if isBreak { //中断不再采集
- start++
- break
- }
- } else {
- if downtimes < 3 {
- downtimes++
- } else if errtimes == 0 || start == errPageNum+1 {
- errtimes++
- errPageNum = start
- start++
- downtimes = 0
- }
- continue
- }
- } else {
- if downtimes < 3 {
- downtimes++
- } else if errtimes == 0 || start == errPageNum+1 {
- errtimes++
- errPageNum = start
- start++
- downtimes = 0
- }
- continue
- }
- start++
- downtimes = 0
- errtimes = 0
- errPageNum = 0
- util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
- }
- logger.Info(s.Code, "本轮列表页采集详情:", downloadAllNum, repeatAllNum, saveAllNum, finishText)
- save := map[string]interface{}{
- "site": s.Name,
- "channel": s.Channel,
- "spidercode": s.Code,
- "comeintime": time.Now().Unix(),
- "modifyuser": s.MUserName,
- "endpage": start,
- "finish": finishText,
- "savenum": saveAllNum,
- "count": downloadAllNum,
- "repeat": repeatAllNum,
- }
- MgoS.Save("spider_supplement", save)
- return errs
- }
- // 下载某一页数据
- func (s *Spider) DownListOnePage(pagenum int) (downnum, repeatnum int) {
- defer qu.Catch()
- downtimes := 0
- for downtimes < 3 { //错误重试3次
- if err := s.L.CallByParam(lua.P{
- Fn: s.L.GetGlobal("downloadAndParseListPage"),
- NRet: 1,
- Protect: true,
- }, lua.LNumber(pagenum)); err != nil {
- //panic(s.Code + "," + err.Error())
- logger.Error("列表页采集报错", pagenum, s.Code+","+err.Error())
- atomic.AddInt32(&s.Script.ErrorNum, 1)
- //列表页采集报错进行重试
- downtimes++
- continue
- }
- lv := s.L.Get(-1)
- s.L.Pop(1)
- if tbl, ok := lv.(*lua.LTable); ok {
- //list := []map[string]interface{}{}
- //qu.Debug("当前页数据量:", tbl.Len())
- if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页
- repeatListNum := 0 // 当前列表页连接重复个数
- for i := 1; i <= tabLen; i++ {
- v := tbl.RawGetInt(i).(*lua.LTable)
- tmp := util.TableToMap(v)
- if !s.IsHistoricalMend { //不是历史补漏
- tmp["dataging"] = 0 //数据中打标记dataging=0
- if s.DownDetail {
- s.DownloadDetailItem(tmp, &repeatListNum)
- }
- } else { //历史补漏
- s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页
- }
- }
- repeatnum = repeatListNum
- downnum = tabLen
- return
- //if !s.IsHistoricalMend && !s.DownDetail {
- // if len(list) > 0 { //保存信息入库
- // StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list)
- // }
- //}
- } else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页(存在列表页数据被全部过滤的情况)
- downtimes++
- continue
- }
- } else { //请求当前列表页失败
- downtimes++
- continue
- }
- }
- return
- }
- // 开启单独线程继续采集列表页
- func ContinueDownListPageItem(s *Spider) {
- defer qu.Catch()
- spTmp, errstr := CreateSpider(s.SCode, s.ScriptFile, true, true) //生成新爬虫
- logger.Info(s.SCode, "补充连续翻页开始...")
- if errstr == "" && spTmp != nil && spTmp.Code != "nil" { //脚本加载成功
- spTmp.ContinueDownListChildTask = true
- defer spTmp.L.Close()
- err := spTmp.DownListPageItem() //下载列表
- logger.Info(s.SCode, "补充连续翻页结束...")
- if err != nil {
- logger.Error(spTmp.Code, err)
- }
- }
- }
- // 遍历,开启三级页下载(历史补漏)
- func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
- //qu.Debug("--------------历史下载-----------------")
- defer qu.Catch()
- var err interface{}
- data := map[string]interface{}{}
- paramdata := p.(map[string]interface{})
- for k, v := range paramdata {
- data[k] = v
- }
- href := qu.ObjToString(data["href"])
- if len(href) <= 5 { //无效数据
- return
- }
- hashHref := util.HexText(href)
- isExist := util.RedisExist("list", "list_"+hashHref)
- //logger.Debug("full href:", href, " isExist:", isExist)
- if !s.IsMustDownload { //非强制下载
- if isExist { //数据存在,直接return
- return
- } else if util.Config.IsHistoryEvent { //1、7000(历史节点)的历史补漏,数据存入spider_historydata
- num := 0
- SaveHighListPageData(paramdata, hashHref, &num)
- return
- }
- } else { //当前不支持强制下载
- return
- }
- //2、非7000(历史节点)的历史补漏,采完列表直接采详情,采完爬虫下架(当前无此爬虫)
- id := ""
- isEsRepeat := false
- if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete {
- title := qu.ObjToString(paramdata["title"])
- eTime := time.Now().Unix()
- sTime := eTime - int64(7*86400)
- //esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
- esQuery := elc.NewBoolQuery().Must(elc.NewRangeQuery("comeintime").Gte(sTime).Lte(eTime)).Must(elc.NewTermQuery("title.mtitle", title))
- if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态
- isEsRepeat = true
- }
- }
- SaveListPageData(paramdata, &id, isEsRepeat) //存储采集记录
- if isEsRepeat { //类竞品数据title判重数据加入redis
- util.RedisSet("list", "list_"+hashHref, "", 86400*365*2)
- util.AddBloomRedis("href", href)
- return
- }
- //qu.Debug("----------------下载、解析、入库--------------------")
- //下载详情页
- data, err = s.DownloadDetailPage(paramdata, data)
- if err != nil || data == nil { //下载失败,结束
- if err != nil {
- logger.Error(s.Code, err, paramdata)
- }
- //更新spider_listdata中数据下载失败标记
- MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}})
- return
- }
- util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) //采集成功,加入列表页redis
- //根据发布时间进行数据判重校验
- tmphref := qu.ObjToString(data["href"]) //取tmphref,三级页href替换导致前后href不同
- publishtime := qu.Int64All(data["l_np_publishtime"])
- if publishtime < time.Now().AddDate(-1, 0, 0).Unix() { //一年前数据进行全量bloom redis href判重
- isExist, _ = util.ExistsBloomRedis("href", tmphref)
- if isExist {
- MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "tmphref": tmphref, "updatetime": time.Now().Unix()}})
- return
- }
- }
- //详情页过滤数据
- set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}
- if data["delete"] != nil {
- //util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到
- set["exist"] = "delete"
- //MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
- MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": set})
- return
- }
- //更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新)
- MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
- //三级页href替换导致前后href不同,采集成功后将原始href加入全量redis
- //if tmphref := qu.ObjToString(data["href"]); tmphref != href {
- // util.AddBloomRedis("href", href)
- //}
- flag := true
- //publishtime := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) //publishtime
- if s.IsMustDownload { //强制下载
- if isExist && publishtime < time.Now().AddDate(0, 0, -5).Unix() {
- //qu.Debug("强制下载 redis存在")
- data["dataging"] = 1 //此处dataging=1对应保存服务中取redis中href对应的id值,进行更新(现redis中已无id值,所以无效)
- flag = false
- } else {
- //qu.Debug("强制下载 redis不存在")
- data["dataging"] = 0
- }
- } else { //非强制下载
- if !isExist {
- //qu.Debug("非强制下载 redis不存在")
- data["dataging"] = 0
- }
- }
- //if publishtime > time.Now().Unix() { //防止发布时间超前
- // data["publishtime"] = time.Now().Unix()
- //}
- delete(data, "state")
- delete(data, "exit")
- delete(data, "checkpublishtime")
- data["comeintime"] = time.Now().Unix()
- data["spidercode"] = s.Code
- //qu.Debug("--------------开始保存---------------")
- data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
- Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, flag)
- //qu.Debug("--------------保存结束---------------")
- }
- // 遍历,开启三级页下载(增量)
- func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
- defer qu.Catch()
- var err interface{}
- data := map[string]interface{}{}
- paramdata := p.(map[string]interface{})
- for k, v := range paramdata {
- data[k] = v
- }
- href := qu.ObjToString(data["href"])
- if len(href) <= 5 { //无效数据
- *num++ //视为已采集
- return
- }
- hashHref := util.HexText(href)
- //列表页redis判重
- isExist := util.RedisExist("list", "list_"+hashHref)
- if Supplement && !isExist { //补采,再进行全量redis判重
- isExist, _ = util.ExistsBloomRedis("href", href)
- }
- if isExist {
- *num++ //已采集
- return
- }
- id := "" //记录spider_listdata中保存的数据id,便于下载成功后更新状态
- //if util.Config.Modal == 1 || (util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history") { //列表页、详情页分开采集模式节点和7000节点新爬虫采集的数据数据
- if util.Config.Modal == 1 || util.Config.IsHistoryEvent || Supplement { //分开采集模式和历史节点(7000)
- SaveHighListPageData(paramdata, hashHref, num) //存表
- return
- } else {
- if !s.Stop {
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail", false) //记录modal=0老模式采集三级页心跳
- }
- isEsRepeat := false
- if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete {
- title := qu.ObjToString(paramdata["title"])
- eTime := time.Now().Unix()
- sTime := eTime - int64(7*86400)
- //esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
- esQuery := elc.NewBoolQuery().Must(elc.NewRangeQuery("comeintime").Gte(sTime).Lte(eTime)).Must(elc.NewTermQuery("title.mtitle", title))
- if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态
- isEsRepeat = true
- }
- }
- SaveListPageData(paramdata, &id, isEsRepeat) //保存7000、7410、7500、7510、7520、7700节点列表页采集的信息
- if isEsRepeat { //类竞品数据title判重数据加入redis
- util.RedisSet("list", "list_"+hashHref, "", 86400*365*2)
- util.AddBloomRedis("href", href)
- return
- }
- }
- //下载详情页
- data, err = s.DownloadDetailPage(paramdata, data)
- if err != nil || data == nil {
- *num++ //顺序采集模式,在记录重复数据个数时,采集失败记为重复(避免下载失败数据每轮次采集都不会被判重,造成全采次数+1)
- if err != nil {
- logger.Error(s.Code, err, paramdata)
- //if len(paramdata) > 0 {
- // SaveErrorData(s.MUserName, paramdata, err) //保存错误信息
- //}
- }
- //更新spider_listdata中数据下载失败标记
- MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1, "updatetime": time.Now().Unix()}})
- return
- } /*else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
- util.RedisClusterSet(hashHref, "", -1) //全量redis中存值列表页href
- }*/
- util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) //加入列表页redis
- //根据发布时间进行数据判重校验
- tmphref := qu.ObjToString(data["href"])
- publishtime := qu.Int64All(data["l_np_publishtime"])
- //7410节点(变链接节点)或者一年前数据进行全量bloomredis href判重
- if util.Config.Uploadevent == 7410 || publishtime < time.Now().AddDate(-1, 0, 0).Unix() {
- isExist, _ = util.ExistsBloomRedis("href", tmphref)
- if isExist {
- //MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "tmphref": tmphref, "updatetime": time.Now().Unix()}})
- MgoS.Update("spider_listdata", map[string]interface{}{"href": tmphref}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "byid": id, "tmphref": tmphref, "updatetime": time.Now().Unix()}}, false, true)
- return
- }
- }
- //详情页下载数据成功心跳
- if !s.Stop {
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute", false) //记录modal=0老模式采集到数据心跳
- }
- set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}
- //详情页过滤数据
- if data["delete"] != nil {
- //util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到
- set["exist"] = "delete"
- //MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
- MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": set})
- return
- }
- set["byid"] = id
- //更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新)
- MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
- //三级页href替换导致前后href不同,采集成功后将原始href加入全量redis
- //if tmphref := qu.ObjToString(data["href"]); tmphref != href {
- // util.AddBloomRedis("href", href)
- //}
- delete(data, "state")
- delete(data, "exit")
- delete(data, "checkpublishtime")
- data["comeintime"] = time.Now().Unix()
- data["spidercode"] = s.Code
- data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
- data["infoformat"] = s.Infoformat //爬虫类型
- Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
- }
- // 遍历下载名录
- func (s *Spider) DownloadDetailByNames(p interface{}) {
- defer qu.Catch()
- var err interface{}
- /*
- if s.Stop {
- return
- }
- for s.Pass {
- util.TimeSleepFunc(2*time.Second, TimeSleepChan)
- }
- */
- data := map[string]interface{}{}
- paramdata := p.(map[string]interface{})
- for k, v := range paramdata {
- data[k] = v
- }
- if s.DownDetail {
- href := qu.ObjToString(data["href"])
- if href == "" || len(href) < 5 { //无效数据
- return
- }
- //下载、解析、入库
- data, err = s.DownloadDetailPage(paramdata, data)
- if err != nil {
- logger.Error(s.Code, paramdata, err)
- return
- }
- }
- data["comeintime"] = time.Now().Unix()
- //atomic.AddInt32(&s.LastDowncount, 1)
- //atomic.AddInt32(&s.TodayDowncount, 1)
- //atomic.AddInt32(&s.TotalDowncount, 1)
- Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
- }
- // 下载解析详情页
- func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) {
- defer qu.Catch()
- s.LastHeartbeat = time.Now().Unix()
- util.TimeSleepFunc((time.Duration(s.SleepBase+util.GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan)
- tab := s.L.NewTable()
- for k, v := range param {
- if val, ok := v.(string); ok {
- tab.RawSet(lua.LString(k), lua.LString(val))
- } else if val, ok := v.(int64); ok {
- tab.RawSet(lua.LString(k), lua.LNumber(val))
- } else if val, ok := v.(int32); ok {
- tab.RawSet(lua.LString(k), lua.LNumber(val))
- } else if val, ok := v.(float64); ok {
- tab.RawSet(lua.LString(k), lua.LNumber(val))
- } else if val, ok := v.(float32); ok {
- tab.RawSet(lua.LString(k), lua.LNumber(val))
- } else if val, ok := v.(bool); ok {
- tab.RawSet(lua.LString(k), lua.LBool(val))
- }
- }
- var err error
- if err = s.L.CallByParam(lua.P{
- Fn: s.L.GetGlobal("downloadDetailPage"),
- NRet: 1,
- Protect: true,
- }, tab); err != nil {
- //panic(s.Code + "," + err.Error())
- log.Println(s.Code + "," + err.Error())
- atomic.AddInt32(&s.Script.ErrorNum, 1)
- return data, err
- }
- lv := s.L.Get(-1)
- s.L.Pop(1)
- //拼map
- if v3, ok := lv.(*lua.LTable); ok {
- v3.ForEach(func(k, v lua.LValue) {
- if tmp, ok := k.(lua.LString); ok {
- key := string(tmp)
- if value, ok := v.(lua.LString); ok {
- data[key] = string(value)
- } else if value, ok := v.(lua.LNumber); ok {
- data[key] = int64(value)
- } else if value, ok := v.(*lua.LTable); ok {
- tmp := util.TableToMap(value)
- data[key] = tmp
- }
- }
- })
- return data, err
- } else {
- return nil, err
- }
- }
- // 高性能模式定时采集三级页信息
- func DetailData() {
- defer qu.Catch()
- <-InitAllLuaOver //脚本加载完毕,执行
- if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true
- GetListDataDownloadDetail()
- }
- }
- func GetListDataDownloadDetail() {
- defer qu.Catch()
- logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++")
- Allspiders2.Range(func(k, v interface{}) bool {
- sp := v.(*Spider)
- go sp.DownloadHighDetail(true)
- time.Sleep(200 * time.Millisecond)
- return true
- })
- }
- // 高性能模式根据列表页数据下载三级页
- func (s *Spider) DownloadHighDetail(reload bool) {
- defer qu.Catch()
- for {
- logger.Info("Detail Running Code:", s.Code, " Stop:", s.Stop)
- if !s.Stop { //爬虫是运行状态
- s.DownloadDetail(reload, false)
- } else {
- break
- }
- }
- }
- // 队列模式根据列表页数据下载三级页
- func (s *Spider) DownloadListDetail(reload bool) {
- defer qu.Catch()
- s.DownloadDetail(reload, false)
- //队列模式爬虫下载完三级页数据或无下载数据,使用后close
- s.Stop = true
- if _, b := Allspiders2.Load(s.Code); b {
- Allspiders2.Store(s.Code, s)
- }
- s.L.Close()
- CC2 <- s.L
- }
- // 下载详情页
- func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
- defer qu.Catch()
- coll := "spider_highlistdata"
- isEsRepeat := false //是否进行es判重
- q := map[string]interface{}{
- "spidercode": s.Code,
- "state": 0, //0:入库状态;-1:采集失败;1:成功
- }
- o := map[string]interface{}{"_id": -1}
- if !isHistory { //非历史数据下载,补充comeintime时间检索条件
- comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累
- if delaySite := DelaySiteMap[s.Name]; delaySite != nil {
- isEsRepeat = delaySite.Compete
- if delaySite.DelayTime <= util.Config.DayNum*24 { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay小时采集(由于7410、7500、7700为顺序采集,无法延时)
- //comeintimeQuery["$lte"] = GetTime(-delayDay + 1)
- comeintimeQuery["$lte"] = time.Now().Unix() - int64(3600*delaySite.DelayTime)
- }
- }
- q["comeintime"] = comeintimeQuery
- } else {
- coll = "spider_historydata"
- o["_id"] = 1 //历史数据正序
- }
- f := map[string]interface{}{
- "state": 0,
- "comeintime": 0,
- "event": 0,
- }
- if !isHistory && !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail", false) //记录modal=1采集三级页心跳
- }
- countNum := MgoS.Count(coll, q) //统计util.Config.DayNum天内未下载爬虫个数
- if isHistory && countNum == 0 { //下载历史数据量为0,手动stop
- s.Stop = true
- return
- }
- //logger.Info("Thread Info: Code:", s.SCode, " count:", countNum)
- if countNum > 0 {
- threadNum := countNum / util.Config.ThreadBaseNum //线程数
- if threadNum > util.Config.ThreadUpperLimit { //设置单个爬虫线程上限
- threadNum = util.Config.ThreadUpperLimit
- }
- logger.Info("Thread Info: Code:", s.SCode, " count:", countNum, " thread num:", threadNum)
- list, _ := MgoS.Find(coll, q, o, f, false, 0, 200)
- if list != nil && len(*list) > 0 {
- spChan := make(chan *Spider, threadNum+1) //初始化线程通道(+1表示基本的线程数)
- if threadNum > 1 { //初始化多个sp
- if !isHistory {
- //从LoopListPath取爬虫信息,是为了保证创建的spider对象始终使用的是最新的爬虫信息(爬虫上架后会更新LoopListPath中的爬虫信息)
- if v, ok := LoopListPath.Load(s.Code); ok && v != nil {
- if info, ok := v.(map[string]string); ok {
- NewSpiderByScript(threadNum, s.Code, info["script"], spChan)
- } else {
- logger.Debug("LoopListPath Not Has Code:", s.Code)
- spChan = make(chan *Spider, 1) //不能创建其它sp只能用主线程的sp
- }
- } else {
- logger.Debug("LoopListPath Not Has Code:", s.Code)
- spChan = make(chan *Spider, 1) //不能创建其它sp只能用主线程的sp
- }
- } else {
- NewSpiderByScript(threadNum, s.Code, s.ScriptFile, spChan)
- }
- }
- spChan <- s //主线程sp放入通道
- wg := &sync.WaitGroup{}
- spLock := &sync.Mutex{}
- updateArr := [][]map[string]interface{}{}
- for _, tmp := range *list {
- spTmp := <-spChan //通道中取出sp对象
- wg.Add(1)
- atomic.AddInt64(&AllThreadNum, 1)
- go func(tmp map[string]interface{}, sp *Spider) {
- defer func() {
- spChan <- sp //处理完数据sp对象放回通道中
- wg.Done()
- atomic.AddInt64(&AllThreadNum, -1)
- }()
- if s.Stop || sp == nil { //爬虫下架或者初始化sp为nil时不再下载数据
- return
- }
- _id := tmp["_id"]
- query := map[string]interface{}{"_id": _id}
- href := qu.ObjToString(tmp["href"])
- //hashHref := util.HexText(href)
- update := []map[string]interface{}{}
- if isEsRepeat { //es数据title判重
- title := qu.ObjToString(tmp["title"])
- eTime := time.Now().Unix()
- sTime := eTime - int64(7*86400)
- //esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
- esQuery := elc.NewBoolQuery().Must(elc.NewRangeQuery("comeintime").Gte(sTime).Lte(eTime)).Must(elc.NewTermQuery("title.mtitle", title))
- count := Es.Count(EsIndex, EsType, esQuery)
- if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
- util.AddBloomRedis("href", href)
- set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "es", "updatetime": time.Now().Unix()}} //已存在state置为1
- update = append(update, query)
- update = append(update, set)
- spLock.Lock()
- updateArr = append(updateArr, update)
- spLock.Unlock()
- return
- }
- }
- times := qu.IntAll(tmp["times"]) //获取下载次数
- success := true //数据是否下载成功的标志
- delete(tmp, "_id")
- delete(tmp, "times")
- data := map[string]interface{}{}
- var err interface{}
- for k, v := range tmp {
- data[k] = v
- }
- //下载、解析、入库
- data, err = sp.DownloadDetailPage(tmp, data)
- if !isHistory && !sp.Stop && sp.IsMainThread { //在下载详情页时爬虫下架,此时不再存心跳信息
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute", false) //记录modal=1下载数据心跳
- }
- if err != nil || data == nil {
- success = false
- times++
- if err != nil {
- logger.Error(s.Code, err, tmp)
- //if len(tmp) > 0 && !isHistory { //下载历史数据时不保存错误信息
- // SaveErrorData(s.MUserName, tmp, err) //保存错误信息
- //}
- } /*else if data == nil && times >= 3 { //下载问题,建editor任务
- DownloadErrorData(s.Code, tmp)
- }*/
- } /*else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
- util.RedisClusterSet(hashHref, "", -1)
- }*/
- if !success { //下载失败更新次数和状态
- ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
- if times >= 3 { //3次下载失败今天不再下载,state置为1
- ss["state"] = -1
- }
- set := map[string]interface{}{"$set": ss}
- update = append(update, query)
- update = append(update, set)
- spLock.Lock()
- updateArr = append(updateArr, update)
- spLock.Unlock()
- return
- } else if data["delete"] != nil { //三级页过滤
- //util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到
- //更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
- set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "delete", "updatetime": time.Now().Unix()}}
- update = append(update, query)
- update = append(update, set)
- spLock.Lock()
- updateArr = append(updateArr, update)
- spLock.Unlock()
- return
- }
- //正文、附件分析,下载异常数据重新下载
- if r := AnalysisProjectInfo(data); r != "" { //顺序采集暂不加此块判断(异常数据不会加redis,导致一直下载)
- times++
- ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
- if times >= 3 { //3次下载失败今天不再下载,state置为-1
- ss["state"] = -1
- ss["detailfilerr"] = r
- }
- set := map[string]interface{}{"$set": ss}
- update = append(update, query)
- update = append(update, set)
- spLock.Lock()
- updateArr = append(updateArr, update)
- spLock.Unlock()
- return
- }
- //数据采集成功
- //根据发布时间进行数据判重校验
- tmphref := qu.ObjToString(data["href"])
- publishtime := qu.Int64All(data["l_np_publishtime"])
- if publishtime < time.Now().AddDate(-1, 0, 0).Unix() {
- isExist, _ := util.ExistsBloomRedis("href", tmphref)
- if isExist {
- set := map[string]interface{}{"$set": map[string]interface{}{
- "state": 1,
- "updatetime": time.Now().Unix(),
- "exist": "bloom_href",
- "tmphref": tmphref,
- }}
- update = append(update, query)
- update = append(update, set)
- spLock.Lock()
- updateArr = append(updateArr, update)
- spLock.Unlock()
- return
- }
- }
- delete(data, "exit")
- delete(data, "checkpublishtime")
- data["comeintime"] = time.Now().Unix()
- data["spidercode"] = s.Code
- data["dataging"] = 0
- data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
- data["infoformat"] = s.Infoformat //爬虫类型
- Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
- set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}} //下载成功state置为1
- update = append(update, query)
- update = append(update, set)
- spLock.Lock()
- updateArr = append(updateArr, update)
- spLock.Unlock()
- //到此数据下载完成
- }(tmp, spTmp)
- }
- wg.Wait()
- //更新数据
- if len(updateArr) > 0 {
- MgoS.UpdateBulk(coll, updateArr...)
- updateArr = [][]map[string]interface{}{}
- }
- close(spChan) //关闭通道
- //释放sp对象(保留主线程sp,IsMainThread=true)
- for sp := range spChan {
- if sp != nil && !sp.IsMainThread {
- sp.L.Close()
- }
- }
- if !s.Stop && reload { //高性能模式下载完三级页数据,sp对象需要重载
- //重载主线程sp
- s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
- }
- }
- } else if reload { //高性能模式无数据sleep
- time.Sleep(30 * time.Second)
- }
- }
- // 初始化sp对象
- func NewSpiderByScript(num int, code, script string, spChan chan *Spider) {
- for i := 1; i <= num; i++ {
- spTmp, errstr := CreateSpider(code, script, true, true)
- if errstr == "" && spTmp != nil { //脚本加载成功
- spChan <- spTmp
- } else {
- spChan <- nil
- }
- }
- }
- // detail含“详情请访问原网页!”且附件未下成功的,不计入下载成功
- func AnalysisProjectInfo(data map[string]interface{}) string {
- defer qu.Catch()
- detail := qu.ObjToString(data["detail"])
- if RestrictAccessReg.MatchString(detail) { //限制访问
- return "ip"
- }
- if detail == "详情请访问原网页!" || detail == "<br/>详情请访问原网页!" { //不判断包含关系因为有些数据为json拼接,字段不全,会加“详情请访问原网页”
- if projectinfo, ok := data["projectinfo"].(map[string]interface{}); ok && len(projectinfo) > 0 {
- if attachments, ok := projectinfo["attachments"].(map[string]interface{}); ok && len(attachments) > 0 {
- for _, data := range attachments {
- if d, ok := data.(map[string]interface{}); ok {
- fid := qu.ObjToString(d["fid"])
- if fid != "" { //附件上传成功
- return ""
- }
- }
- }
- return "detail_file"
- } else {
- return "detail_file"
- }
- } else {
- return "detail_file"
- }
- }
- return ""
- }
- // 打印线程数
- func AllThreadLog() {
- logger.Info("List Download All Thread:", ListAllThreadNum)
- logger.Info("Detail Download All Thread:", AllThreadNum)
- time.AfterFunc(1*time.Minute, AllThreadLog)
- }
- // 获取hascode
- func GetHas1(data string) string {
- t := sha1.New()
- io.WriteString(t, data)
- hf := Reg.FindString(data)
- if !strings.HasSuffix(hf, "/") {
- hf = hf + "/"
- }
- return hf + fmt.Sprintf("%x", t.Sum(nil))
- }
|