/** 爬虫,脚本接口,需要扩展 */ package spider import ( "crypto/sha1" "crypto/sha256" "fmt" "io" "log" "math/big" "math/rand" mu "mfw/util" mgo "mongodb" qu "qfw/util" mgu "qfw/util/mongodbutil" "strconv" //mgu "qfw/util/mongodbutil" //"qfw/util/redis" es "qfw/util/elastic" "regexp" util "spiderutil" "strings" "sync/atomic" "time" "github.com/donnie4w/go-logger/logger" "github.com/yuin/gopher-lua" ) type Heart struct { DetailHeart int64 //爬虫三级页执行心跳 DetailExecuteHeart int64 //三级页采集到数据心跳 FindListHeart int64 //findListHtml执行心跳 ListHeart int64 //爬虫列表页执行心跳 ModifyUser string //爬虫维护人 Site string //站点 Channel string //栏目 } //爬虫() type Spider struct { Script Code string //代码 Name string //名称 Channel string //站点 DownDetail bool //是否下载详细页 Stop bool //停止标志 Pass bool //暂停标志 LastPubshTime int64 //最后发布时间 LastHeartbeat int64 //最后心跳时间 SpiderRunRate int64 //执行频率 ExecuteOkTime int64 //任务执行成功/完成时间 Collection string //写入表名 Thread int64 //线程数 LastExecTime int64 //最后执行时间 LastDowncount int32 //最后一次下载量 TodayDowncount int32 //今日下载量 YesterdayDowncount int32 //昨日下载量 TotalDowncount int32 //总下载量 RoundCount int32 //执行轮次 StoreMode int //存储模式 StoreToMsgEvent int //消息类型 CoverAttr string //按属性判重数据 SleepBase int //基本延时 SleepRand int //随机延时 TargetChannelUrl string //栏目页地址 UpperLimit, LowerLimit int //正常值上限、下限 UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间 MUserName, MUserEmail string //维护人,维护人邮箱 Index int //数组索引 //历史补漏 IsHistoricalMend bool //是否是历史补漏爬虫 IsMustDownload bool //是否强制下载 IsCompete bool //区分新老爬虫 } var Es *es.Elastic var EsIndex string var EsType string var Mgo *mgo.MongodbSim var UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态 var UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息 var SaveMgoCache = make(chan map[string]interface{}, 1000) //保存爬虫采集非本站点数据 var SP = make(chan bool, 5) var SPH = make(chan bool, 5) var SPS = make(chan bool, 5) var TimeChan = make(chan bool, 1) var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`) var DomainNameReg = regexp.MustCompile(`(?://).+?(?:)[::/]`) var RepDomainNameReg = regexp.MustCompile(`[::/]+`) var DelaySites map[string]int //延迟采集站点集合 //心跳 func UpdateHeart(site, channel, code, user, t string) { if htmp, ok := SpiderHeart.Load(code); ok { if heart, ok := htmp.(*Heart); ok { if t == "list" { heart.ListHeart = time.Now().Unix() } else if t == "findlist" { heart.FindListHeart = time.Now().Unix() } else if t == "detail" { heart.DetailHeart = time.Now().Unix() } else if t == "detailexcute" { heart.DetailExecuteHeart = time.Now().Unix() } } } else { heart := &Heart{ ModifyUser: user, Site: site, Channel: channel, } if t == "list" { heart.ListHeart = time.Now().Unix() } else if t == "findlist" { heart.FindListHeart = time.Now().Unix() } else if t == "detail" { heart.DetailHeart = time.Now().Unix() } else if t == "detailexcute" { heart.DetailExecuteHeart = time.Now().Unix() } SpiderHeart.Store(code, heart) } } //任务 func (s *Spider) StartJob() { s.Stop = false s.Pass = false s.RoundCount++ go s.ExecJob(false) } //单次执行 func (s *Spider) ExecJob(reload bool) { defer func() { size_ok, size_no := 0, 0 size_no_index := []interface{}{} LoopListPath.Range(func(k, v interface{}) bool { if v != nil { size_ok++ } else { size_no_index = append(size_no_index, k) size_no++ } return true }) logger.Debug("index_", s.Index, ",", s.Code, s.Name, "ok,本轮下载量:", s.LastDowncount, ",轮询数据长度:", size_ok, ",废弃数量:", size_no, ",废弃位置:", size_no_index) s.ExecuteOkTime = time.Now().Unix() util.TimeSleepFunc(5*time.Second, TimeSleepChan) if util.Config.Working == 1 { s.Stop = true if _, b := Allspiders.Load(s.Code); b { Allspiders.Store(s.Code, s) } s.L.Close() CC <- s.L } }() if reload && util.Config.Working == 0 { //高效模式,轮询调度时重载脚本 s.LoadScript(s.Name, s.Channel, s.MUserName, s.Code, s.ScriptFile, true) } logger.Debug(s.Code, s.Name, "频率:", s.SpiderRunRate, ",", s.Timeout) s.LastDowncount = 0 s.LastExecTime = time.Now().Unix() s.LastHeartbeat = time.Now().Unix() s.ExecuteOkTime = 0 err := s.GetLastPublishTime() //获取最新时间--作为最后更新时间 if err != nil { logger.Error(s.Code, err) } err = s.DownListPageItem() //下载列表 if err != nil { logger.Error(s.Code, err) } if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //判断爬虫是增量还是历史爬虫(目前只会在7000节点上有历史爬虫) UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架 SpiderCodeSendToEditor(s.Code) //发送编辑器 return } else { if util.Config.Working == 0 { //高性能模式 /* for !s.Stop && s.Pass { util.TimeSleepFunc(2*time.Second, TimeSleepChan) } if s.Stop { return } */ //if s.IsMustDownload { //历史数据下载,只跑一轮 if s.IsHistoricalMend && util.Config.IsHistoryEvent { //历史节点7000,高性能模式,历史补漏只下载一轮 UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架 b := mgu.Update("luaconfig", "editor", "editor", map[string]interface{}{"code": s.Code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false) logger.Info("Delete History Code:", s.Code, b) } else { if !s.Stop { //未下架定时执行 util.TimeAfterFunc(time.Duration(s.SpiderRunRate)*time.Minute, func() { s.ExecJob(true) }, TimeChan) // util.TimeAfterFunc(30*time.Second, func() { // s.ExecJob(true) // }, TimeChan) } else { //下架后子线程退出 return } } } else { //排队模式 return } } } //获取最新时间--作为最后更新时间 func (s *Spider) GetLastPublishTime() (errs interface{}) { defer mu.Catch() var lastpublishtime string //取得最后更新时间 if err := s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("getLastPublishTime"), NRet: 1, Protect: true, }); err != nil { //panic(s.Code + "," + err.Error()) log.Println(s.Code + "," + err.Error()) errs = err.Error() atomic.AddInt32(&s.Script.ErrorNum, 1) return errs } ret := s.L.Get(-1) s.L.Pop(1) if str, ok := ret.(lua.LString); ok { lastpublishtime = string(str) } if s.LastPubshTime < util.ParseDate2Int64(lastpublishtime) { //防止发布时间超前 if util.ParseDate2Int64(lastpublishtime) > time.Now().Unix() { s.LastPubshTime = time.Now().Unix() } else { s.LastPubshTime = util.ParseDate2Int64(lastpublishtime) } } return nil } //下载列表 func (s *Spider) DownListPageItem() (errs interface{}) { defer mu.Catch() start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页 tmpMax := max //临时记录最大页 repeatAllNum := 0 //本轮采集tmpMax页总的重复个数 downloadAllNum := 0 //本轮采集tmpMax页总个数 if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点,爬虫跑历史 max = s.GetIntVar("spiderHistoryMaxPage") } downtimes := 0 //记录某页重试次数(暂定3次) repeatPageNum := 0 //记录列表页所有连接重复的页码 repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页) isRunRepeatList := false //是否执行列表页连续判重 if util.Config.Modal == 1 && util.Config.Working == 0 && max > 1 && max < 101 { //7100 7400最大页小于101且大于1,对此部分爬虫采集列表页时进行连续5页判重 isRunRepeatList = true max = 100 //设置最大页为100 } for ; start <= max && !s.Stop; start++ { if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list") //记录所有节点列表页心跳 } //qu.Debug("重复页:", repeatPageNum, " 配置最大页:", tmpMax, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes) //if start > tmpMax && isRunRepeatList && repeatPageTimes >= 5 { //重复次数超过5次,不再翻页 // break //} if isRunRepeatList && repeatPageTimes >= 10 { //重复次数超过10次,不再翻页 break } if err := s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("downloadAndParseListPage"), NRet: 1, Protect: true, }, lua.LNumber(start)); err != nil { //panic(s.Code + "," + err.Error()) logger.Error("列表页采集报错", start, s.Code+","+err.Error()) errs = err.Error() atomic.AddInt32(&s.Script.ErrorNum, 1) //列表页采集报错进行重试,超过重试次数视为该页已采 if downtimes < 2 { downtimes++ start-- //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复 } else if isRunRepeatList { //超过重试次数,视为本页重复 if repeatPageNum+1 == start { repeatPageTimes++ //次数加1 } else { repeatPageTimes = 0 //重复次数重置0 } repeatPageNum = start //赋值页码 downtimes = 0 } continue } lv := s.L.Get(-1) s.L.Pop(1) if tbl, ok := lv.(*lua.LTable); ok { list := []map[string]interface{}{} //qu.Debug("当前页数据量:", tbl.Len()) if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页 repeatListNum := 0 // 当前列表页连接重复个数 for i := 1; i <= tabLen; i++ { v := tbl.RawGetInt(i).(*lua.LTable) tmp := util.TableToMap(v) //s.ThisSiteData(tmp) //统计当前下载数据是否是本站点数据 if !s.IsHistoricalMend { //不是历史补漏 tmp["dataging"] = 0 //数据中打标记dataging=0 if s.DownDetail { s.DownloadDetailItem(tmp, &repeatListNum) } else { tmp["comeintime"] = time.Now().Unix() atomic.AddInt32(&s.LastDowncount, 1) atomic.AddInt32(&s.TodayDowncount, 1) atomic.AddInt32(&s.TotalDowncount, 1) href := fmt.Sprint(tmp["href"]) if len(href) > 5 { //有效数据 db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db hashHref := HexText(href) //增量(redis默认db0) util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365) //全量(判断是否已存在防止覆盖id) isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref) if !isExist { util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1) } list = append(list, tmp) } } } else { //历史补漏 s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页 } } //if start <= tmpMax { //数量赋值 repeatAllNum += repeatListNum downloadAllNum += tabLen //} //if start > tmpMax && isRunRepeatList { //执行连续页码判重 if isRunRepeatList { //执行连续页码判重 if repeatListNum >= tabLen { //当前start列表页全部数据都已采集 //qu.Debug("重复页:", repeatPageNum, "当前页:", start) if repeatPageNum+1 == start || repeatPageNum == 0 { repeatPageTimes++ //次数加1 } else { repeatPageTimes = 0 //重复次数重置0 } repeatPageNum = start //赋值页码 } else { //当前start页有遗漏数据 repeatPageTimes = 0 repeatPageNum = 0 } } if !s.IsHistoricalMend && !s.DownDetail { if len(list) > 0 { //保存信息入库 StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list) } } } else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页 if downtimes < 2 { downtimes++ start-- continue //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复 } else if isRunRepeatList { //超过重试次数,视为本页重复 if repeatPageNum+1 == start { repeatPageTimes++ //次数加1 } else { repeatPageTimes = 0 //重复次数重置0 } repeatPageNum = start //赋值页码 } } } else { //请求当前列表页失败 if downtimes < 2 { downtimes++ start-- continue //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复 } else if isRunRepeatList { //超过重试次数,视为本页重复 if repeatPageNum+1 == start { repeatPageTimes++ //次数加1 } else { repeatPageTimes = 0 //重复次数重置0 } repeatPageNum = start //赋值页码 } } downtimes = 0 //当前页下载无误,重置下载重试次数 util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan) } nowTime := time.Now() sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout) set := map[string]interface{}{ "site": s.Name, "channel": s.Channel, "spidercode": s.Code, "updatetime": nowTime.Unix(), "event": util.Config.Uploadevent, "modifyuser": s.MUserName, "maxpage": tmpMax, "runrate": s.SpiderRunRate, "endpage": start, "date": sDate, } inc := map[string]interface{}{ "alltimes": 1, } if downloadAllNum > 0 { rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum) rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64) if rate == 1.0 { inc["oh_percent"] = 1 } else if rate >= 0.9 { inc["nt_percent"] = 1 } else if rate >= 0.8 { inc["et_percent"] = 1 } else { inc["other_percent"] = 1 } } else { inc["zero"] = 1 } query := map[string]interface{}{ "date": sDate, "spidercode": s.Code, } logger.Info(s.Code, "本轮列表页采集详情:", downloadAllNum, repeatAllNum, start) Mgo.Update("spider_downloadrate", query, map[string]interface{}{ "$set": set, "$inc": inc, }, true, false) return errs } func (s *Spider) ThisSiteData(tmp map[string]interface{}) { defer qu.Catch() href := qu.ObjToString(tmp["href"]) url_dn := RepDomainNameReg.ReplaceAllString(DomainNameReg.FindString(s.TargetChannelUrl), "") href_dn := RepDomainNameReg.ReplaceAllString(DomainNameReg.FindString(href), "") if url_dn != href_dn { SaveMgoCache <- map[string]interface{}{ "site": s.Name, "channel": s.Channel, "spidercode": s.Code, "url": s.TargetChannelUrl, "href": href, "modifyuser": s.MUserName, "comeintime": time.Now().Unix(), } } } //遍历,开启三级页下载(历史补漏) func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) { //qu.Debug("--------------历史下载-----------------") defer mu.Catch() var err interface{} data := map[string]interface{}{} paramdata := p.(map[string]interface{}) for k, v := range paramdata { data[k] = v } href := qu.ObjToString(data["href"]) if len(href) <= 5 { //无效数据 return } db := HexToBigIntMod(href) hashHref := HexText(href) id := "" SaveListPageData(paramdata, &id, false) //存储采集记录 isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref) //取全量redis //log.Println("full href:", href, " isExist:", isExist) logger.Debug("full href:", href, " isExist:", isExist) if !s.IsMustDownload && isExist { //非强制下载redis中存在,结束 //qu.Debug("非强制下载redis中存在,结束") //更新spider_listdata中数据下载成功标记 if id != "" { //Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "byid": id}}, false, true) Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}}) } return } //qu.Debug("----------------下载、解析、入库--------------------") //下载、解析、入库 data, err = s.DownloadDetailPage(paramdata, data) if err != nil || data == nil { //下载失败,结束 if err != nil { logger.Error(s.Code, err, paramdata) // if len(paramdata) > 0 { // SaveErrorData(paramdata) //保存错误信息 // } } //更新spider_listdata中数据下载失败标记 if id != "" { Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}}) } return } //更新spider_listdata中数据下载成功标记 if id != "" { //Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "byid": id}}, false, true) Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}}) } flag := true t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) //publishtime if s.IsMustDownload { //强制下载 if isExist && t1 < time.Now().AddDate(0, 0, -5).Unix() { //qu.Debug("强制下载 redis存在") data["dataging"] = 1 flag = false } else { //qu.Debug("强制下载 redis不存在") data["dataging"] = 0 //WithinThreeDays(&data) //根据发布时间打标记 } } else { //非强制下载 if !isExist { //qu.Debug("非强制下载 redis不存在") data["dataging"] = 0 //WithinThreeDays(&data) //根据发布时间打标记 } } if t1 > time.Now().Unix() { //防止发布时间超前 data["publishtime"] = time.Now().Unix() } delete(data, "state") delete(data, "exit") delete(data, "checkpublishtime") data["comeintime"] = time.Now().Unix() atomic.AddInt32(&s.LastDowncount, 1) atomic.AddInt32(&s.TodayDowncount, 1) atomic.AddInt32(&s.TotalDowncount, 1) data["spidercode"] = s.Code //qu.Debug("--------------开始保存---------------") data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断) Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, flag) //qu.Debug("--------------保存结束---------------") } //遍历,开启三级页下载(增量) func (s *Spider) DownloadDetailItem(p interface{}, num *int) { defer mu.Catch() var err interface{} data := map[string]interface{}{} paramdata := p.(map[string]interface{}) for k, v := range paramdata { data[k] = v } href := qu.ObjToString(data["href"]) if len(href) <= 5 { //无效数据 *num++ //视为已采集 return } /* //查询增量redis查看信息是否已经下载 isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href) if isExist { //更新redis生命周期 util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30) *num++ //已采集 return } log.Println("href had++:", isExist, href) */ id := "" //记录spider_listdata中保存的数据id,便于下载成功后更新状态 if util.Config.Modal == 1 { //除7000、7500、7700节点外所有节点只采集列表页信息 isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href) if isExist { //更新redis生命周期 util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365) *num++ //已采集 return } SaveHighListPageData(paramdata, s.SCode, href, num) return } else { if !s.Stop { UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=0老模式采集三级页心跳 } isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href) if isExist { //更新redis生命周期 util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365) *num++ //已采集 return } isEsRepeat := false if delayDay := DelaySites[s.Name]; delayDay > 0 { //类竞品站点爬虫title做es7天内判重检验(顺序采集无法延迟,只能判重) title := qu.ObjToString(paramdata["title"]) eTime := time.Now().Unix() sTime := eTime - int64(7*86400) esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}` if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态 isEsRepeat = true } } SaveListPageData(paramdata, &id, isEsRepeat) //保存7000、7410、7500、7700节点列表页采集的信息 if isEsRepeat { //类竞品数据title判重数据加入redis util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365) return } } //下载、解析、入库 data, err = s.DownloadDetailPage(paramdata, data) if err != nil || data == nil { if err != nil { logger.Error(s.Code, err, paramdata) if len(paramdata) > 0 { SaveErrorData(s.MUserName, paramdata, err) //保存错误信息 } } //更新spider_listdata中数据下载失败标记 if id != "" { Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}}) } return } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同 log.Println("beforeHref:", href, "afterHref:", tmphref) //增量 util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365) //全量 db := HexToBigIntMod(href) hashHref := HexText(href) isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref) if !isExist { util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1) } } //更新spider_listdata中数据下载成功标记 if id != "" { Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "byid": id}}, false, true) //Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}}) } t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) if t1 > time.Now().Unix() { //防止发布时间超前 data["publishtime"] = time.Now().Unix() } if !s.Stop { UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=0老模式采集到数据心跳 } delete(data, "state") delete(data, "exit") delete(data, "checkpublishtime") data["comeintime"] = time.Now().Unix() atomic.AddInt32(&s.LastDowncount, 1) atomic.AddInt32(&s.TodayDowncount, 1) atomic.AddInt32(&s.TotalDowncount, 1) data["spidercode"] = s.Code //qu.Debug("-----增量开始保存-----") // 临时保存数据 // update := []map[string]interface{}{} // _id := data["_id"].(string) // update = append(update, map[string]interface{}{"_id": qu.StringTOBsonId(_id)}) // update = append(update, map[string]interface{}{ // "$set": map[string]interface{}{ // "jsondata": data["jsondata"], // }, // }) // UpdataMgoCache <- update data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断) Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true) //qu.Debug("-----增量保存结束-----") } //遍历下载名录 func (s *Spider) DownloadDetailByNames(p interface{}) { defer mu.Catch() var err interface{} /* if s.Stop { return } for s.Pass { util.TimeSleepFunc(2*time.Second, TimeSleepChan) } */ //TODO 下载3级页,调用LUA分析;如果配置的不用下载3级页,就到此为止了,直接存储 data := map[string]interface{}{} paramdata := p.(map[string]interface{}) for k, v := range paramdata { data[k] = v } if s.DownDetail { href := qu.ObjToString(data["href"]) if href == "" || len(href) < 5 { //无效数据 return } //下载、解析、入库 data, err = s.DownloadDetailPage(paramdata, data) if err != nil { logger.Error(s.Code, paramdata, err) return } } data["comeintime"] = time.Now().Unix() atomic.AddInt32(&s.LastDowncount, 1) atomic.AddInt32(&s.TodayDowncount, 1) atomic.AddInt32(&s.TotalDowncount, 1) Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true) } //下载解析内容页 func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) { defer mu.Catch() s.LastHeartbeat = time.Now().Unix() util.TimeSleepFunc((time.Duration(s.SleepBase+GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan) tab := s.L.NewTable() for k, v := range param { if val, ok := v.(string); ok { tab.RawSet(lua.LString(k), lua.LString(val)) } else if val, ok := v.(int64); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(int32); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(float64); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(float32); ok { tab.RawSet(lua.LString(k), lua.LNumber(val)) } else if val, ok := v.(bool); ok { tab.RawSet(lua.LString(k), lua.LBool(val)) } } var err error if err = s.L.CallByParam(lua.P{ Fn: s.L.GetGlobal("downloadDetailPage"), NRet: 1, Protect: true, }, tab); err != nil { //panic(s.Code + "," + err.Error()) log.Println(s.Code + "," + err.Error()) atomic.AddInt32(&s.Script.ErrorNum, 1) return data, err } lv := s.L.Get(-1) s.L.Pop(1) //拼map if v3, ok := lv.(*lua.LTable); ok { v3.ForEach(func(k, v lua.LValue) { if tmp, ok := k.(lua.LString); ok { key := string(tmp) if value, ok := v.(lua.LString); ok { data[key] = string(value) } else if value, ok := v.(lua.LNumber); ok { data[key] = value } else if value, ok := v.(*lua.LTable); ok { tmp := util.TableToMap(value) data[key] = tmp } } }) return data, err } else { return nil, err } } //高性能模式定时采集三级页信息 func DetailData() { defer qu.Catch() <-InitAllLuaOver //脚本加载完毕,执行 if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true GetListDataDownloadDetail() } } func GetListDataDownloadDetail() { defer qu.Catch() logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++") Allspiders2.Range(func(k, v interface{}) bool { sp := v.(*Spider) go sp.DownloadHighDetail() time.Sleep(2 * time.Second) return true }) } //高性能模式根据列表页数据下载三级页 func (s *Spider) DownloadHighDetail() { defer qu.Catch() for { logger.Info("Running Code:", s.Code, "Stop:", s.Stop) if !s.Stop { //爬虫是运行状态 comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累 isEsRepeat := false //是否进行es判重 if delayDay := DelaySites[s.Name]; delayDay > 0 { isEsRepeat = true if delayDay <= util.Config.DayNum*24 { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay小时采集(由于7410、7500、7700为顺序采集,无法延时) //comeintimeQuery["$lte"] = GetTime(-delayDay + 1) comeintimeQuery["$lte"] = time.Now().Unix() - int64(3600*delayDay) } } q := map[string]interface{}{ "spidercode": s.Code, "state": 0, //0:入库状态;-1:采集失败;1:成功 "comeintime": comeintimeQuery, } o := map[string]interface{}{"_id": -1} f := map[string]interface{}{ "state": 0, "comeintime": 0, "event": 0, } if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳 } list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 100) if list != nil && len(*list) > 0 { for _, tmp := range *list { _id := tmp["_id"] query := map[string]interface{}{"_id": _id} href := qu.ObjToString(tmp["href"]) //由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在 //为了避免重复下载,进行增量redis判重 isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href) if isExist { set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1 Mgo.Update("spider_highlistdata", query, set, false, false) continue } if isEsRepeat { //es数据title判重 title := qu.ObjToString(tmp["title"]) eTime := time.Now().Unix() sTime := eTime - int64(7*86400) esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}` count := Es.Count(EsIndex, EsType, esQuery) if count > 0 { //es中含本title数据,不再采集,更新list表数据状态 set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1 Mgo.Update("spider_highlistdata", query, set, false, false) util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365) continue } } times := qu.IntAll(tmp["times"]) success := true //数据是否下载成功的标志 delete(tmp, "_id") delete(tmp, "times") data := map[string]interface{}{} var err interface{} for k, v := range tmp { data[k] = v } //下载、解析、入库 data, err = s.DownloadDetailPage(tmp, data) if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳 } if err != nil || data == nil { success = false times++ if err != nil { logger.Error(s.Code, err, tmp) if len(tmp) > 0 { SaveErrorData(s.MUserName, tmp, err) //保存错误信息 } } /*else if data == nil && times >= 3 { //下载问题,建editor任务 DownloadErrorData(s.Code, tmp) }*/ } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同 log.Println("beforeHref:", href, "afterHref:", tmphref) //增量 util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365) //全量 db := HexToBigIntMod(href) hashHref := HexText(href) isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref) if !isExist { util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1) } } if !success { //下载失败更新次数和状态 ss := map[string]interface{}{"times": times} if times >= 3 { //3次下载失败今天不再下载,state置为1 ss["state"] = -1 } set := map[string]interface{}{"$set": ss} Mgo.Update("spider_highlistdata", query, set, false, false) continue } t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) if t1 > time.Now().Unix() { //防止发布时间超前 data["publishtime"] = time.Now().Unix() } delete(data, "exit") delete(data, "checkpublishtime") data["comeintime"] = time.Now().Unix() //计数 tmpsp1, b := Allspiders.Load(s.Code) if b { sp1, ok := tmpsp1.(*Spider) if ok { atomic.AddInt32(&sp1.LastDowncount, 1) atomic.AddInt32(&sp1.TodayDowncount, 1) atomic.AddInt32(&sp1.TotalDowncount, 1) } } data["spidercode"] = s.Code data["dataging"] = 0 data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断) Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true) set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1 Mgo.Update("spider_highlistdata", query, set, false, false) } //重载spider s.LoadScript(s.Name, s.Channel, s.MUserName, s.Code, s.ScriptFile, true) } else { //没有数据 time.Sleep(2 * time.Minute) } //s.GetListDataDownloadDetail() //开始下一轮 } else { logger.Info("Running Code:", s.Code, "Stop:", s.Stop) break } } } //队列模式根据列表页数据下载三级页 func (s *Spider) DownloadListDetail() { defer qu.Catch() defer func() { //爬虫下载完三级页数据或无下载数据,使用后close s.Stop = true if _, b := Allspiders2.Load(s.Code); b { Allspiders2.Store(s.Code, s) } s.L.Close() CC2 <- s.L }() comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累 isEsRepeat := false //是否进行es判重 if delayDay := DelaySites[s.Name]; delayDay > 0 { isEsRepeat = true if delayDay <= util.Config.DayNum { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay天采集(由于7410、7500、7700为顺序采集,无法延时) //comeintimeQuery["$lte"] = GetTime(-delayDay + 1) comeintimeQuery["$lte"] = time.Now().Unix() - int64(86400*delayDay) } } q := map[string]interface{}{ "spidercode": s.Code, "state": 0, //0:入库状态;-1:采集失败;1:成功 "comeintime": comeintimeQuery, } o := map[string]interface{}{"_id": -1} f := map[string]interface{}{ "state": 0, "comeintime": 0, "event": 0, } if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1下载数据心跳 } list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 100) if list != nil && len(*list) > 0 { for _, tmp := range *list { _id := tmp["_id"] query := map[string]interface{}{"_id": _id} href := qu.ObjToString(tmp["href"]) //由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在 //为了避免重复下载,进行增量redis判重 isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href) if isExist { set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1 Mgo.Update("spider_highlistdata", query, set, false, false) continue } if isEsRepeat { //es数据title判重 title := qu.ObjToString(tmp["title"]) eTime := time.Now().Unix() sTime := eTime - int64(7*86400) esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}` if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态 set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1 Mgo.Update("spider_highlistdata", query, set, false, false) util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365) continue } } times := qu.IntAll(tmp["times"]) success := true //数据是否下载成功的标志 delete(tmp, "_id") delete(tmp, "times") data := map[string]interface{}{} var err interface{} for k, v := range tmp { data[k] = v } //下载、解析、入库 data, err = s.DownloadDetailPage(tmp, data) if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息 UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳 } if err != nil || data == nil { success = false times++ if err != nil { logger.Error(s.Code, err, tmp) if len(tmp) > 0 { SaveErrorData(s.MUserName, tmp, err) //保存错误信息 } } /*else if data == nil && times >= 3 { //下载问题,建editor任务 DownloadErrorData(s.Code, tmp) }*/ } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同 log.Println("beforeHref:", href, "afterHref:", tmphref) //增量 util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365) //全量 db := HexToBigIntMod(href) hashHref := HexText(href) isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref) if !isExist { util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1) } } if !success { //下载失败更新次数和状态 ss := map[string]interface{}{"times": times} if times >= 3 { //3次下载失败今天不再下载,state置为1 ss["state"] = -1 } set := map[string]interface{}{"$set": ss} Mgo.Update("spider_highlistdata", query, set, false, false) continue } t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) if t1 > time.Now().Unix() { //防止发布时间超前 data["publishtime"] = time.Now().Unix() } delete(data, "exit") delete(data, "checkpublishtime") data["comeintime"] = time.Now().Unix() //计数 tmpsp1, b := Allspiders.Load(s.Code) if b { sp1, ok := tmpsp1.(*Spider) if ok { atomic.AddInt32(&sp1.LastDowncount, 1) atomic.AddInt32(&sp1.TodayDowncount, 1) atomic.AddInt32(&sp1.TotalDowncount, 1) } } data["spidercode"] = s.Code data["dataging"] = 0 data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断) Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true) set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1 Mgo.Update("spider_highlistdata", query, set, false, false) } } } //获取随机数 func GetRandMath(num int) int { r := rand.New(rand.NewSource(time.Now().UnixNano())) return r.Intn(num) } //获取hascode func GetHas1(data string) string { t := sha1.New() io.WriteString(t, data) hf := Reg.FindString(data) if !strings.HasSuffix(hf, "/") { hf = hf + "/" } return hf + fmt.Sprintf("%x", t.Sum(nil)) } //对href哈希取模 func HexToBigIntMod(href string) int { //取哈希值 t := sha256.New() io.WriteString(t, href) hex := fmt.Sprintf("%x", t.Sum(nil)) //取模 n := new(big.Int) n, _ = n.SetString(hex[2:], 16) return int(n.Mod(n, big.NewInt(16)).Int64()) } //求hash func HexText(href string) string { h := sha256.New() h.Write([]byte(href)) return fmt.Sprintf("%x", h.Sum(nil)) } //func RedisIsExist(href string) bool { // isExist := false // if len(href) > 75 { //取href的哈希判断是否存在 // hashHref := GetHas1(href) // isExist, _ = util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+hashHref) // } // if !isExist { //取string href判断是否存在 // isExist, _ = util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href) // } // return isExist //} //判断发布时间是否在三天内 //func WithinThreeDays(data *map[string]interface{}) { // withinThreeDays := false // //根据发布时间打标记 // publishtime := util.ParseDate2Int64(qu.ObjToString((*data)["publishtime"])) //没有发布时间,取当前时间 // //发布时间 // now := time.Now().Unix() // if now-publishtime > 259200 { //三天前数据 // withinThreeDays = false // } else { // withinThreeDays = true // } // if withinThreeDays { // //qu.Debug("发布时间在三天内") // (*data)["dataging"] = 0 // } else { // //qu.Debug("发布时间在三天外") // (*data)["dataging"] = 1 // } //}