/** 爬虫,脚本接口,需要扩展 */ package spider import ( "crypto/sha1" "fmt" elc "gopkg.in/olivere/elastic/v7" "io" "log" mgo "mongodb" qu "qfw/util" es "qfw/util/elastic.v7" "strconv" "strings" "sync" "regexp" util "spiderutil" "sync/atomic" "time" "github.com/donnie4w/go-logger/logger" "github.com/yuin/gopher-lua" ) // Heart 心跳 type Heart struct { DetailHeart int64 //爬虫三级页执行心跳 DetailExecuteHeart int64 //三级页采集到数据心跳 FindListHeart int64 //findListHtml执行心跳 ListHeart int64 //爬虫列表页执行心跳 ModifyUser string //爬虫维护人 Site string //站点 Channel string //栏目 } // SpiderFlow 流量 type SpiderFlow struct { Flow int64 //流量 ModifyUser string //爬虫维护人 Site string //站点 Channel string //栏目 //Code string } // Spider 爬虫 type Spider struct { Script Code string //代码 Name string //名称 Channel string //站点 DownDetail bool //是否下载详细页 Stop bool //停止标志 Pass bool //暂停标志 LastPubshTime int64 //最后发布时间 LastHeartbeat int64 //最后心跳时间 SpiderRunRate int64 //执行频率 ExecuteOkTime int64 //任务执行成功/完成时间 Collection string //写入表名 Thread int64 //线程数 LastExecTime int64 //最后执行时间 LastDowncount int32 //最后一次下载量 TodayDowncount int32 //今日下载量 YesterdayDowncount int32 //昨日下载量 TotalDowncount int32 //总下载量 RoundCount int32 //执行轮次 StoreMode int //存储模式 StoreToMsgEvent int //消息类型 CoverAttr string //按属性判重数据 SleepBase int //基本延时 SleepRand int //随机延时 TargetChannelUrl string //栏目页地址 UpperLimit, LowerLimit int //正常值上限、下限 UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间 MUserName, MUserEmail string //维护人,维护人邮箱 //Index int //数组索引 //历史补漏 IsHistoricalMend bool //是否是历史补漏爬虫 IsMustDownload bool //是否强制下载 IsCompete bool //区分新老爬虫 Infoformat int //区分爬虫类型 1:招标;2:拟建/审批;3:产权 IsMainThread bool //是否为主线程(多线程采集时区分是否为主线程) } var ( Es *es.Elastic EsIndex string EsType string MgoS *mgo.MongodbSim MgoEB *mgo.MongodbSim TimeChan = make(chan bool, 1) Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`) RestrictAccessReg = regexp.MustCompile(`访问被拒绝`) //DomainNameReg = regexp.MustCompile(`(?://).+?(?:)[::/]`) //RepDomainNameReg = regexp.MustCompile(`[::/]+`) //Today string //SpiderFlowMap = sync.Map{} //code:{"2022-05-16":SpiderFlow} AllThreadNum int64 DelaySiteMap map[string]*DelaySite //延迟采集站点集合 //UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态 //SP = make(chan bool, 5) //SaveMgoCache = make(chan map[string]interface{}, 1000) //保存爬虫采集非本站点数据 //SPS = make(chan bool, 5) UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息 SPH = make(chan bool, 5) DataBakSaveCache = make(chan map[string]interface{}, 1000) //保存采集信息详情页记录 DB_CH = make(chan bool, 5) ) type DelaySite struct { DelayTime int Compete bool } //心跳 func UpdateHeart(site, channel, code, user, t string) { //sp, spiderOk := LoopListPath.Load(code) //if spiderOk && sp != nil { if htmp, ok := SpiderHeart.Load(code); ok { if heart, ok := htmp.(*Heart); ok { if t == "list" { heart.ListHeart = time.Now().Unix() } else if t == "findlist" { heart.FindListHeart = time.Now().Unix() } else if t == "detail" { heart.DetailHeart = time.Now().Unix() } else if t == "detailexcute" { heart.DetailExecuteHeart = time.Now().Unix() } } } else { heart := &Heart{ ModifyUser: user, Site: site, Channel: channel, } if t == "list" { heart.ListHeart = time.Now().Unix() } else if t == "findlist" { heart.FindListHeart = time.Now().Unix() } else if t == "detail" { heart.DetailHeart = time.Now().Unix() } else if t == "detailexcute" { heart.DetailExecuteHeart = time.Now().Unix() } SpiderHeart.Store(code, heart) } //} } //任务 func (s *Spider) StartJob() { s.Stop = false s.Pass = false s.RoundCount++ go s.ExecJob(false) } //单次执行 func (s *Spider) ExecJob(reload bool) { defer func() { size_ok, size_no := 0, 0 size_no_index := []interface{}{} LoopListPath.Range(func(k, v interface{}) bool { if v != nil { size_ok++ } else { size_no_index = append(size_no_index, k) size_no++ } return true }) logger.Debug(s.Code, s.Name, "ok,本轮下载量:", s.LastDowncount, ",轮询数据长度:", size_ok, ",下线数量:", size_no, ",下线爬虫:", size_no_index) s.ExecuteOkTime = time.Now().Unix() util.TimeSleepFunc(5*time.Second, TimeSleepChan) if util.Config.Working == 1 { s.Stop = true if _, b := Allspiders.Load(s.Code); b { Allspiders.Store(s.Code, s) } s.L.Close() CC <- s.L } }() if reload && util.Config.Working == 0 { //高效模式,轮询调度时重载脚本 s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false) } logger.Debug(s.Code, s.Name, "频率:", s.SpiderRunRate, ",", s.Timeout) s.LastDowncount = 0 s.LastExecTime = time.Now().Unix() s.LastHeartbeat = time.Now().Unix() s.ExecuteOkTime = 0 err := s.GetLastPublishTime() //获取最新时间--作为最后更新时间 if err != nil { logger.Error(s.Code, err) } err = s.DownListPageItem() //下载列表 if err != nil { logger.Error(s.Code, err) } if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //判断爬虫是增量还是历史爬虫(目前只会在7000节点上有历史爬虫) UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架 SpiderCodeSendToEditor(s.Code) //历史转增量爬虫发送编辑器,切换节点上下架 return } else { if util.Config.Working == 0 { //高性能模式 /* for !s.Stop && s.Pass { util.TimeSleepFunc(2*time.Second, TimeSleepChan) } if s.Stop { return } */ //if s.IsMustDownload { //历史数据下载,只跑一轮 if s.IsHistoricalMend && util.Config.IsHistoryEvent { //历史节点7000,高性能模式,历史补漏只下载一轮 UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架 b := MgoEB.Update("luaconfig", map[string]interface{}{"code": s.Code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false) logger.Info("Delete History Code:", s.Code, b) } else { if !s.Stop { //未下架定时执行 util.TimeAfterFunc(time.Duration(s.SpiderRunRate)*time.Minute, func() { s.ExecJob(true) }, TimeChan) // util.TimeAfterFunc(30*time.Second, func() { // s.ExecJob(true) // }, TimeChan) } else { //下架后子线程退出 return } } } else { //排队模式 return } } } //获取最新时间--作为最后更新时间 func (s *Spider) GetLastPublishTime() (errs interface{}) { defer qu.Catch() var lastpublishtime string //取得最后更新时间 if err := s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("getLastPublishTime"), NRet: 1, Protect: true, }); err != nil { //panic(s.Code + "," + err.Error()) log.Println(s.Code + "," + err.Error()) errs = err.Error() atomic.AddInt32(&s.Script.ErrorNum, 1) return errs } ret := s.L.Get(-1) s.L.Pop(1) if str, ok := ret.(lua.LString); ok { lastpublishtime = string(str) } if s.LastPubshTime < util.ParseDate2Int64(lastpublishtime) { //防止发布时间超前 if util.ParseDate2Int64(lastpublishtime) > time.Now().Unix() { s.LastPubshTime = time.Now().Unix() } else { s.LastPubshTime = util.ParseDate2Int64(lastpublishtime) } } return nil } //下载列表 func (s *Spider) DownListPageItem() (errs interface{}) { defer qu.Catch() s.AlreadyGetPageHeart = map[int]bool{} //重置记录 start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页 s.MaxPage = max // //tmpMax := max //临时记录最大页 repeatAllNum := 0 //本轮采集tmpMax页总的重复个数 downloadAllNum := 0 //本轮采集tmpMax页总个数 if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点,爬虫跑历史 max = s.GetIntVar("spiderHistoryMaxPage") } downtimes := 0 //记录某页重试次数(暂定3次) repeatPageNum := 0 //记录列表页所有连接重复的页码 repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页) repeatPageTimesLimit := 10 //记录页码连续判重的次数上线(高性能模式10页,队列模式5页) isRunRepeatList := false //是否执行列表页连续判重 if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重 isRunRepeatList = true max = 100 //高性能模式设置最大页为100 if util.Config.Working == 1 { //队列模式 repeatPageTimesLimit = 3 //连续判重页3 max = 50 //队列模式最大页50 } } for ; start <= max && !s.Stop; start++ { if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list") //记录所有节点列表页心跳 } //logger.Info("爬虫:", s.Code, "重复页:", repeatPageNum, " 配置最大页:", tmpMax, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes) //if start > tmpMax && isRunRepeatList && repeatPageTimes >= 5 { //重复次数超过5次,不再翻页 // break //} if isRunRepeatList && repeatPageTimes >= repeatPageTimesLimit { //重复次数超过10次,不再翻页 break } if err := s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("downloadAndParseListPage"), NRet: 1, Protect: true, }, lua.LNumber(start)); err != nil { //panic(s.Code + "," + err.Error()) logger.Error("列表页采集报错", start, s.Code+","+err.Error()) errs = err.Error() atomic.AddInt32(&s.Script.ErrorNum, 1) //列表页采集报错进行重试,超过重试次数视为该页已采 if downtimes < 2 { downtimes++ start-- //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复 } else if isRunRepeatList { //超过重试次数,视为本页重复 if repeatPageNum+1 == start { repeatPageTimes++ //次数加1 } else { repeatPageTimes = 0 //重复次数重置0 } repeatPageNum = start //赋值页码 downtimes = 0 } continue } lv := s.L.Get(-1) s.L.Pop(1) if tbl, ok := lv.(*lua.LTable); ok { list := []map[string]interface{}{} //qu.Debug("当前页数据量:", tbl.Len()) if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页 repeatListNum := 0 // 当前列表页连接重复个数 for i := 1; i <= tabLen; i++ { v := tbl.RawGetInt(i).(*lua.LTable) tmp := util.TableToMap(v) //s.ThisSiteData(tmp) //统计当前下载数据是否是本站点数据 if !s.IsHistoricalMend { //不是历史补漏 tmp["dataging"] = 0 //数据中打标记dataging=0 if s.DownDetail { s.DownloadDetailItem(tmp, &repeatListNum) } /*else {//暂无此类爬虫 tmp["comeintime"] = time.Now().Unix() //atomic.AddInt32(&s.LastDowncount, 1) //atomic.AddInt32(&s.TodayDowncount, 1) //atomic.AddInt32(&s.TotalDowncount, 1) href := fmt.Sprint(tmp["href"]) if len(href) > 5 { //有效数据 hashHref := util.HexText(href) util.RedisClusterSet(hashHref, "", -1) //全量redis list = append(list, tmp) } }*/ } else { //历史补漏 s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页 } } //if start <= tmpMax { //数量赋值 repeatAllNum += repeatListNum downloadAllNum += tabLen //} //if start > tmpMax && isRunRepeatList { //执行连续页码判重 if isRunRepeatList { //执行连续页码判重 if repeatListNum >= tabLen { //当前start列表页全部数据都已采集 //qu.Debug("重复页:", repeatPageNum, "当前页:", start) if repeatPageNum+1 == start || repeatPageNum == 0 { repeatPageTimes++ //次数加1 } else { repeatPageTimes = 0 //重复次数重置0 } repeatPageNum = start //赋值页码 } else { //当前start页有遗漏数据 repeatPageTimes = 0 repeatPageNum = 0 } } if !s.IsHistoricalMend && !s.DownDetail { if len(list) > 0 { //保存信息入库 StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list) } } } else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页 if downtimes < 2 { downtimes++ start-- continue //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复 } else if isRunRepeatList { //超过重试次数,视为本页重复 if repeatPageNum+1 == start { repeatPageTimes++ //次数加1 } else { repeatPageTimes = 0 //重复次数重置0 } repeatPageNum = start //赋值页码 } } } else { //请求当前列表页失败 if downtimes < 2 { downtimes++ start-- continue //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复 } else if isRunRepeatList { //超过重试次数,视为本页重复 if repeatPageNum+1 == start { repeatPageTimes++ //次数加1 } else { repeatPageTimes = 0 //重复次数重置0 } repeatPageNum = start //赋值页码 } } downtimes = 0 //当前页下载无误,重置下载重试次数 util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan) } logger.Info(s.Code, "本轮列表页采集详情:", downloadAllNum, repeatAllNum, start, s.Stop) if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率 nowTime := time.Now() sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout) set := map[string]interface{}{ "site": s.Name, "channel": s.Channel, "spidercode": s.Code, "updatetime": nowTime.Unix(), "event": util.Config.Uploadevent, "modifyuser": s.MUserName, "maxpage": s.MaxPage, "runrate": s.SpiderRunRate, "endpage": start, "date": sDate, } inc := map[string]interface{}{ "alltimes": 1, } //记录翻页是否成功 if s.PageOneTextHash != "" { if s.PageTwoTextHash != "" { if s.PageOneTextHash != s.PageTwoTextHash { inc["page_success"] = 1 } else { inc["page_fail"] = 1 } } else { inc["page_fail"] = 1 } } else if s.PageTwoTextHash != "" { inc["page_onefail"] = 1 } if downloadAllNum > 0 { rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum) rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64) if rate == 1.0 { inc["oh_percent"] = 1 } else if rate >= 0.9 { inc["nt_percent"] = 1 } else if rate >= 0.8 { inc["et_percent"] = 1 } else { inc["other_percent"] = 1 } if isRunRepeatList && start > max { //连续翻页超过了上限 inc["uplimit"] = 1 } } else { inc["zero"] = 1 } query := map[string]interface{}{ "date": sDate, "spidercode": s.Code, } MgoS.Update("spider_downloadrate", query, map[string]interface{}{ "$set": set, "$inc": inc, }, true, false) } //信息重置 s.RecordedHeartInfo = false s.PageOneTextHash = "" s.PageTwoTextHash = "" return errs } //站点信息统计 //func (s *Spider) ThisSiteData(tmp map[string]interface{}) { // defer qu.Catch() // href := qu.ObjToString(tmp["href"]) // url_dn := RepDomainNameReg.ReplaceAllString(DomainNameReg.FindString(s.TargetChannelUrl), "") // href_dn := RepDomainNameReg.ReplaceAllString(DomainNameReg.FindString(href), "") // if url_dn != href_dn { // SaveMgoCache <- map[string]interface{}{ // "site": s.Name, // "channel": s.Channel, // "spidercode": s.Code, // "url": s.TargetChannelUrl, // "href": href, // "modifyuser": s.MUserName, // "comeintime": time.Now().Unix(), // } // } //} //遍历,开启三级页下载(历史补漏) func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) { //qu.Debug("--------------历史下载-----------------") defer qu.Catch() var err interface{} data := map[string]interface{}{} paramdata := p.(map[string]interface{}) for k, v := range paramdata { data[k] = v } href := qu.ObjToString(data["href"]) if len(href) <= 5 { //无效数据 return } hashHref := util.HexText(href) isExist := util.RedisExist("list", "list_"+hashHref) //logger.Debug("full href:", href, " isExist:", isExist) if !s.IsMustDownload { //非强制下载 if isExist { //数据存在,直接return return } else if util.Config.IsHistoryEvent { //1、7000(历史节点)的历史补漏,数据存入spider_historydata num := 0 SaveHighListPageData(paramdata, hashHref, &num) return } } else { //当前不支持强制下载 return } //2、非7000(历史节点)的历史补漏,采完列表直接采详情,采完爬虫下架(当前无此爬虫) id := "" isEsRepeat := false if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete { title := qu.ObjToString(paramdata["title"]) eTime := time.Now().Unix() sTime := eTime - int64(7*86400) //esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}` esQuery := elc.NewBoolQuery().Must(elc.NewRangeQuery("comeintime").Gte(sTime).Lte(eTime)).Must(elc.NewTermQuery("title.mtitle", title)) if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态 isEsRepeat = true } } SaveListPageData(paramdata, &id, isEsRepeat) //存储采集记录 if isEsRepeat { //类竞品数据title判重数据加入redis util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) util.AddBloomRedis("href", href) return } //qu.Debug("----------------下载、解析、入库--------------------") //下载详情页 data, err = s.DownloadDetailPage(paramdata, data) if err != nil || data == nil { //下载失败,结束 if err != nil { logger.Error(s.Code, err, paramdata) } //更新spider_listdata中数据下载失败标记 MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}}) return } util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) //采集成功,加入列表页redis //根据发布时间进行数据判重校验 tmphref := qu.ObjToString(data["href"]) //取tmphref,三级页href替换导致前后href不同 publishtime := qu.Int64All(data["l_np_publishtime"]) if publishtime < time.Now().AddDate(-1, 0, 0).Unix() { //一年前数据进行全量bloom redis href判重 isExist, _ = util.ExistsBloomRedis("href", tmphref) if isExist { MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "tmphref": tmphref, "updatetime": time.Now().Unix()}}) return } } //详情页过滤数据 set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()} if data["delete"] != nil { //util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到 set["exist"] = "delete" //MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true) MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": set}) return } //更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新) MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true) //三级页href替换导致前后href不同,采集成功后将原始href加入全量redis //if tmphref := qu.ObjToString(data["href"]); tmphref != href { // util.AddBloomRedis("href", href) //} flag := true //publishtime := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) //publishtime if s.IsMustDownload { //强制下载 if isExist && publishtime < time.Now().AddDate(0, 0, -5).Unix() { //qu.Debug("强制下载 redis存在") data["dataging"] = 1 //此处dataging=1对应保存服务中取redis中href对应的id值,进行更新(现redis中已无id值,所以无效) flag = false } else { //qu.Debug("强制下载 redis不存在") data["dataging"] = 0 } } else { //非强制下载 if !isExist { //qu.Debug("非强制下载 redis不存在") data["dataging"] = 0 } } //if publishtime > time.Now().Unix() { //防止发布时间超前 // data["publishtime"] = time.Now().Unix() //} delete(data, "state") delete(data, "exit") delete(data, "checkpublishtime") data["comeintime"] = time.Now().Unix() data["spidercode"] = s.Code //qu.Debug("--------------开始保存---------------") data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断) Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, flag) //qu.Debug("--------------保存结束---------------") } //遍历,开启三级页下载(增量) func (s *Spider) DownloadDetailItem(p interface{}, num *int) { defer qu.Catch() var err interface{} data := map[string]interface{}{} paramdata := p.(map[string]interface{}) for k, v := range paramdata { data[k] = v } href := qu.ObjToString(data["href"]) if len(href) <= 5 { //无效数据 *num++ //视为已采集 return } hashHref := util.HexText(href) //列表页redis判重 isExist := util.RedisExist("list", "list_"+hashHref) if isExist { *num++ //已采集 return } id := "" //记录spider_listdata中保存的数据id,便于下载成功后更新状态 //if util.Config.Modal == 1 || (util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history") { //列表页、详情页分开采集模式节点和7000节点新爬虫采集的数据数据 if util.Config.Modal == 1 || util.Config.IsHistoryEvent { //分开采集模式和历史节点(7000) SaveHighListPageData(paramdata, hashHref, num) //存表 return } else { if !s.Stop { UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=0老模式采集三级页心跳 } isEsRepeat := false if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete { title := qu.ObjToString(paramdata["title"]) eTime := time.Now().Unix() sTime := eTime - int64(7*86400) //esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}` esQuery := elc.NewBoolQuery().Must(elc.NewRangeQuery("comeintime").Gte(sTime).Lte(eTime)).Must(elc.NewTermQuery("title.mtitle", title)) if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态 isEsRepeat = true } } SaveListPageData(paramdata, &id, isEsRepeat) //保存7000、7410、7500、7510、7520、7700节点列表页采集的信息 if isEsRepeat { //类竞品数据title判重数据加入redis util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) util.AddBloomRedis("href", href) return } } //下载详情页 data, err = s.DownloadDetailPage(paramdata, data) if err != nil || data == nil { *num++ //顺序采集模式,在记录重复数据个数时,采集失败记为重复(避免下载失败数据每轮次采集都不会被判重,造成全采次数+1) if err != nil { logger.Error(s.Code, err, paramdata) //if len(paramdata) > 0 { // SaveErrorData(s.MUserName, paramdata, err) //保存错误信息 //} } //更新spider_listdata中数据下载失败标记 MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1, "updatetime": time.Now().Unix()}}) return } /*else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同 util.RedisClusterSet(hashHref, "", -1) //全量redis中存值列表页href }*/ util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) //加入列表页redis //根据发布时间进行数据判重校验 tmphref := qu.ObjToString(data["href"]) publishtime := qu.Int64All(data["l_np_publishtime"]) //7410节点(变链接节点)或者一年前数据进行全量bloomredis href判重 if util.Config.Uploadevent == 7410 || publishtime < time.Now().AddDate(-1, 0, 0).Unix() { isExist, _ = util.ExistsBloomRedis("href", tmphref) if isExist { MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "tmphref": tmphref, "updatetime": time.Now().Unix()}}) return } } //详情页下载数据成功心跳 if !s.Stop { UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=0老模式采集到数据心跳 } set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()} //详情页过滤数据 if data["delete"] != nil { //util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到 set["exist"] = "delete" //MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true) MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": set}) return } set["byid"] = id //更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新) MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true) //三级页href替换导致前后href不同,采集成功后将原始href加入全量redis //if tmphref := qu.ObjToString(data["href"]); tmphref != href { // util.AddBloomRedis("href", href) //} delete(data, "state") delete(data, "exit") delete(data, "checkpublishtime") data["comeintime"] = time.Now().Unix() data["spidercode"] = s.Code data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断) data["infoformat"] = s.Infoformat //爬虫类型 Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true) } //遍历下载名录 func (s *Spider) DownloadDetailByNames(p interface{}) { defer qu.Catch() var err interface{} /* if s.Stop { return } for s.Pass { util.TimeSleepFunc(2*time.Second, TimeSleepChan) } */ data := map[string]interface{}{} paramdata := p.(map[string]interface{}) for k, v := range paramdata { data[k] = v } if s.DownDetail { href := qu.ObjToString(data["href"]) if href == "" || len(href) < 5 { //无效数据 return } //下载、解析、入库 data, err = s.DownloadDetailPage(paramdata, data) if err != nil { logger.Error(s.Code, paramdata, err) return } } data["comeintime"] = time.Now().Unix() //atomic.AddInt32(&s.LastDowncount, 1) //atomic.AddInt32(&s.TodayDowncount, 1) //atomic.AddInt32(&s.TotalDowncount, 1) Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true) } //下载解析详情页 func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) { defer qu.Catch() s.LastHeartbeat = time.Now().Unix() util.TimeSleepFunc((time.Duration(s.SleepBase+util.GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan) tab := s.L.NewTable() for k, v := range param { if val, ok := v.(string); ok { tab.RawSet(lua.LString(k), lua.LString(val)) } else if val, ok := v.(int64); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(int32); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(float64); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(float32); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(bool); ok { tab.RawSet(lua.LString(k), lua.LBool(val)) } } var err error if err = s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("downloadDetailPage"), NRet: 1, Protect: true, }, tab); err != nil { //panic(s.Code + "," + err.Error()) log.Println(s.Code + "," + err.Error()) atomic.AddInt32(&s.Script.ErrorNum, 1) return data, err } lv := s.L.Get(-1) s.L.Pop(1) //拼map if v3, ok := lv.(*lua.LTable); ok { v3.ForEach(func(k, v lua.LValue) { if tmp, ok := k.(lua.LString); ok { key := string(tmp) if value, ok := v.(lua.LString); ok { data[key] = string(value) } else if value, ok := v.(lua.LNumber); ok { data[key] = int64(value) } else if value, ok := v.(*lua.LTable); ok { tmp := util.TableToMap(value) data[key] = tmp } } }) return data, err } else { return nil, err } } //高性能模式定时采集三级页信息 func DetailData() { defer qu.Catch() <-InitAllLuaOver //脚本加载完毕,执行 if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true GetListDataDownloadDetail() } } func GetListDataDownloadDetail() { defer qu.Catch() logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++") Allspiders2.Range(func(k, v interface{}) bool { sp := v.(*Spider) go sp.DownloadHighDetail(true) time.Sleep(1 * time.Second) return true }) } //高性能模式根据列表页数据下载三级页 func (s *Spider) DownloadHighDetail(reload bool) { defer qu.Catch() for { logger.Info("Running Code:", s.Code, "Stop:", s.Stop) if !s.Stop { //爬虫是运行状态 s.DownloadDetail(reload, false) } else { logger.Info("Running Code:", s.Code, "Stop:", s.Stop) break } } } //队列模式根据列表页数据下载三级页 func (s *Spider) DownloadListDetail(reload bool) { defer qu.Catch() s.DownloadDetail(reload, false) //队列模式爬虫下载完三级页数据或无下载数据,使用后close s.Stop = true if _, b := Allspiders2.Load(s.Code); b { Allspiders2.Store(s.Code, s) } s.L.Close() CC2 <- s.L } //下载详情页 func (s *Spider) DownloadDetail(reload bool, isHistory bool) { defer qu.Catch() coll := "spider_highlistdata" isEsRepeat := false //是否进行es判重 q := map[string]interface{}{ "spidercode": s.Code, "state": 0, //0:入库状态;-1:采集失败;1:成功 } o := map[string]interface{}{"_id": -1} if !isHistory { //非历史数据下载,补充comeintime时间检索条件 comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累 if delaySite := DelaySiteMap[s.Name]; delaySite != nil { isEsRepeat = delaySite.Compete if delaySite.DelayTime <= util.Config.DayNum*24 { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay小时采集(由于7410、7500、7700为顺序采集,无法延时) //comeintimeQuery["$lte"] = GetTime(-delayDay + 1) comeintimeQuery["$lte"] = time.Now().Unix() - int64(3600*delaySite.DelayTime) } } q["comeintime"] = comeintimeQuery } else { coll = "spider_historydata" o["_id"] = 1 //历史数据正序 } f := map[string]interface{}{ "state": 0, "comeintime": 0, "event": 0, } if !isHistory && !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳 } countNum := MgoS.Count(coll, q) //统计util.Config.DayNum天内未下载爬虫个数 if isHistory && countNum == 0 { //下载历史数据量为0,手动stop s.Stop = true return } //logger.Info("Thread Info: Code:", s.SCode, " count:", countNum) if countNum > 0 { threadNum := countNum / util.Config.ThreadBaseNum //线程数 if threadNum > util.Config.ThreadUpperLimit { //设置单个爬虫线程上限 threadNum = util.Config.ThreadUpperLimit } logger.Info("Thread Info: Code:", s.SCode, " count:", countNum, " thread num:", threadNum) list, _ := MgoS.Find(coll, q, o, f, false, 0, 200) if list != nil && len(*list) > 0 { spChan := make(chan *Spider, threadNum+1) //初始化线程通道(+1表示基本的线程数) if threadNum > 1 { //初始化多个sp if !isHistory { if v, ok := LoopListPath.Load(s.Code); ok && v != nil { if info, ok := v.(map[string]string); ok { NewSpiderByScript(threadNum, s.Code, info, spChan) } else { logger.Debug("LoopListPath Not Has Code:", s.Code) spChan = make(chan *Spider, 1) //不能创建其它sp只能用主线程的sp } } else { logger.Debug("LoopListPath Not Has Code:", s.Code) spChan = make(chan *Spider, 1) //不能创建其它sp只能用主线程的sp } } else { info := map[string]string{ "script": s.ScriptFile, "createuser": s.UserName, "createuseremail": s.UserEmail, "modifyuser": s.MUserName, "modifyemail": s.MUserEmail, } NewSpiderByScript(threadNum, s.Code, info, spChan) } } spChan <- s //主线程sp放入通道 wg := &sync.WaitGroup{} spLock := &sync.Mutex{} updateArr := [][]map[string]interface{}{} for _, tmp := range *list { spTmp := <-spChan //通道中取出sp对象 wg.Add(1) atomic.AddInt64(&AllThreadNum, 1) go func(tmp map[string]interface{}, sp *Spider) { defer func() { spChan <- sp //处理完数据sp对象放回通道中 wg.Done() atomic.AddInt64(&AllThreadNum, -1) }() if s.Stop || sp == nil { //爬虫下架或者初始化sp为nil时不再下载数据 return } _id := tmp["_id"] query := map[string]interface{}{"_id": _id} href := qu.ObjToString(tmp["href"]) //hashHref := util.HexText(href) update := []map[string]interface{}{} if isEsRepeat { //es数据title判重 title := qu.ObjToString(tmp["title"]) eTime := time.Now().Unix() sTime := eTime - int64(7*86400) //esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}` esQuery := elc.NewBoolQuery().Must(elc.NewRangeQuery("comeintime").Gte(sTime).Lte(eTime)).Must(elc.NewTermQuery("title.mtitle", title)) count := Es.Count(EsIndex, EsType, esQuery) if count > 0 { //es中含本title数据,不再采集,更新list表数据状态 util.AddBloomRedis("href", href) set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "es", "updatetime": time.Now().Unix()}} //已存在state置为1 update = append(update, query) update = append(update, set) spLock.Lock() updateArr = append(updateArr, update) spLock.Unlock() return } } times := qu.IntAll(tmp["times"]) //获取下载次数 success := true //数据是否下载成功的标志 delete(tmp, "_id") delete(tmp, "times") data := map[string]interface{}{} var err interface{} for k, v := range tmp { data[k] = v } //下载、解析、入库 data, err = sp.DownloadDetailPage(tmp, data) if !isHistory && !sp.Stop && sp.IsMainThread { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳 } if err != nil || data == nil { success = false times++ if err != nil { logger.Error(s.Code, err, tmp) //if len(tmp) > 0 && !isHistory { //下载历史数据时不保存错误信息 // SaveErrorData(s.MUserName, tmp, err) //保存错误信息 //} } /*else if data == nil && times >= 3 { //下载问题,建editor任务 DownloadErrorData(s.Code, tmp) }*/ } /*else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同 util.RedisClusterSet(hashHref, "", -1) }*/ if !success { //下载失败更新次数和状态 ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()} if times >= 3 { //3次下载失败今天不再下载,state置为1 ss["state"] = -1 } set := map[string]interface{}{"$set": ss} update = append(update, query) update = append(update, set) spLock.Lock() updateArr = append(updateArr, update) spLock.Unlock() return } else if data["delete"] != nil { //三级页过滤 //util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到 //更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "delete", "updatetime": time.Now().Unix()}} update = append(update, query) update = append(update, set) spLock.Lock() updateArr = append(updateArr, update) spLock.Unlock() return } //正文、附件分析,下载异常数据重新下载 if r := AnalysisProjectInfo(data); r != "" { //顺序采集暂不加此块判断(异常数据不会加redis,导致一直下载) times++ ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()} if times >= 3 { //3次下载失败今天不再下载,state置为-1 ss["state"] = -1 ss["detailfilerr"] = r } set := map[string]interface{}{"$set": ss} update = append(update, query) update = append(update, set) spLock.Lock() updateArr = append(updateArr, update) spLock.Unlock() return } //数据采集成功 //根据发布时间进行数据判重校验 tmphref := qu.ObjToString(data["href"]) publishtime := qu.Int64All(data["l_np_publishtime"]) if publishtime < time.Now().AddDate(-1, 0, 0).Unix() { isExist, _ := util.ExistsBloomRedis("href", tmphref) if isExist { set := map[string]interface{}{"$set": map[string]interface{}{ "state": 1, "updatetime": time.Now().Unix(), "exist": "bloom_href", "tmphref": tmphref, }} update = append(update, query) update = append(update, set) spLock.Lock() updateArr = append(updateArr, update) spLock.Unlock() return } } delete(data, "exit") delete(data, "checkpublishtime") data["comeintime"] = time.Now().Unix() data["spidercode"] = s.Code data["dataging"] = 0 data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断) data["infoformat"] = s.Infoformat //爬虫类型 Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true) set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}} //下载成功state置为1 update = append(update, query) update = append(update, set) spLock.Lock() updateArr = append(updateArr, update) spLock.Unlock() //到此数据下载完成 }(tmp, spTmp) } wg.Wait() //更新数据 if len(updateArr) > 0 { MgoS.UpdateBulk(coll, updateArr...) updateArr = [][]map[string]interface{}{} } close(spChan) //关闭通道 //释放sp对象(保留主线程sp,IsMainThread=true) for sp := range spChan { if sp != nil && !sp.IsMainThread { sp.L.Close() } } if !s.Stop && reload { //高性能模式下载完三级页数据,sp对象需要重载 //重载主线程sp s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false) } } } else if reload { //高性能模式无数据sleep2分钟 time.Sleep(2 * time.Minute) } } //初始化sp对象 func NewSpiderByScript(num int, code string, info map[string]string, spChan chan *Spider) { for i := 1; i <= num; i++ { spTmp, errstr := CreateSpider(code, info["script"], true, true) if errstr == "" && spTmp != nil { //脚本加载成功 spTmp.UserName = info["createuser"] spTmp.UserEmail = info["createuseremail"] spTmp.MUserName = info["modifyuser"] spTmp.MUserEmail = info["modifyemail"] spChan <- spTmp } else { spChan <- nil } } } //detail含“详情请访问原网页!”且附件未下成功的,不计入下载成功 func AnalysisProjectInfo(data map[string]interface{}) string { defer qu.Catch() detail := qu.ObjToString(data["detail"]) if RestrictAccessReg.MatchString(detail) { //限制访问 return "ip" } if detail == "详情请访问原网页!" || detail == "
详情请访问原网页!" { //不判断包含关系因为有些数据为json拼接,字段不全,会加“详情请访问原网页” if projectinfo, ok := data["projectinfo"].(map[string]interface{}); ok && len(projectinfo) > 0 { if attachments, ok := projectinfo["attachments"].(map[string]interface{}); ok && len(attachments) > 0 { fileOk := false for _, data := range attachments { if d, ok := data.(map[string]interface{}); ok { fid := qu.ObjToString(d["fid"]) if fid != "" { //附件上传成功 return "" } } } if !fileOk { return "detail_file" } } else { return "detail_file" } } else { return "detail_file" } } return "" } //打印线程数 func AllThreadLog() { logger.Info("Detail Download All Thread:", AllThreadNum) time.AfterFunc(1*time.Minute, AllThreadLog) } //获取hascode func GetHas1(data string) string { t := sha1.New() io.WriteString(t, data) hf := Reg.FindString(data) if !strings.HasSuffix(hf, "/") { hf = hf + "/" } return hf + fmt.Sprintf("%x", t.Sum(nil)) }