12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121 |
- /**
- 爬虫,脚本接口,需要扩展
- */
- package spider
- import (
- "crypto/sha1"
- "crypto/sha256"
- "fmt"
- "io"
- "log"
- "math/big"
- "math/rand"
- mu "mfw/util"
- mgo "mongodb"
- qu "qfw/util"
- mgu "qfw/util/mongodbutil"
- "strconv"
- //mgu "qfw/util/mongodbutil"
- //"qfw/util/redis"
- es "qfw/util/elastic"
- "regexp"
- util "spiderutil"
- "strings"
- "sync/atomic"
- "time"
- "github.com/donnie4w/go-logger/logger"
- "github.com/yuin/gopher-lua"
- )
- type Heart struct {
- DetailHeart int64 //爬虫三级页执行心跳
- DetailExecuteHeart int64 //三级页采集到数据心跳
- FindListHeart int64 //findListHtml执行心跳
- ListHeart int64 //爬虫列表页执行心跳
- ModifyUser string //爬虫维护人
- Site string //站点
- Channel string //栏目
- }
- //爬虫()
- type Spider struct {
- Script
- Code string //代码
- Name string //名称
- Channel string //站点
- DownDetail bool //是否下载详细页
- Stop bool //停止标志
- Pass bool //暂停标志
- LastPubshTime int64 //最后发布时间
- LastHeartbeat int64 //最后心跳时间
- SpiderRunRate int64 //执行频率
- ExecuteOkTime int64 //任务执行成功/完成时间
- Collection string //写入表名
- Thread int64 //线程数
- LastExecTime int64 //最后执行时间
- LastDowncount int32 //最后一次下载量
- TodayDowncount int32 //今日下载量
- YesterdayDowncount int32 //昨日下载量
- TotalDowncount int32 //总下载量
- RoundCount int32 //执行轮次
- StoreMode int //存储模式
- StoreToMsgEvent int //消息类型
- CoverAttr string //按属性判重数据
- SleepBase int //基本延时
- SleepRand int //随机延时
- TargetChannelUrl string //栏目页地址
- UpperLimit, LowerLimit int //正常值上限、下限
- UserName, UserEmail, UploadTime string //开发者名称,开发者邮箱,脚本上传时间
- MUserName, MUserEmail string //维护人,维护人邮箱
- Index int //数组索引
- //历史补漏
- IsHistoricalMend bool //是否是历史补漏爬虫
- IsMustDownload bool //是否强制下载
- IsCompete bool //区分新老爬虫
- }
- var Es *es.Elastic
- var EsIndex string
- var EsType string
- var Mgo *mgo.MongodbSim
- var UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态
- var UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息
- var SaveMgoCache = make(chan map[string]interface{}, 1000) //保存爬虫采集非本站点数据
- var SP = make(chan bool, 5)
- var SPH = make(chan bool, 5)
- var SPS = make(chan bool, 5)
- var TimeChan = make(chan bool, 1)
- var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
- var DomainNameReg = regexp.MustCompile(`(?://).+?(?:)[::/]`)
- var RepDomainNameReg = regexp.MustCompile(`[::/]+`)
- var DelaySites map[string]int //延迟采集站点集合
- //心跳
- func UpdateHeart(site, channel, code, user, t string) {
- if htmp, ok := SpiderHeart.Load(code); ok {
- if heart, ok := htmp.(*Heart); ok {
- if t == "list" {
- heart.ListHeart = time.Now().Unix()
- } else if t == "findlist" {
- heart.FindListHeart = time.Now().Unix()
- } else if t == "detail" {
- heart.DetailHeart = time.Now().Unix()
- } else if t == "detailexcute" {
- heart.DetailExecuteHeart = time.Now().Unix()
- }
- }
- } else {
- heart := &Heart{
- ModifyUser: user,
- Site: site,
- Channel: channel,
- }
- if t == "list" {
- heart.ListHeart = time.Now().Unix()
- } else if t == "findlist" {
- heart.FindListHeart = time.Now().Unix()
- } else if t == "detail" {
- heart.DetailHeart = time.Now().Unix()
- } else if t == "detailexcute" {
- heart.DetailExecuteHeart = time.Now().Unix()
- }
- SpiderHeart.Store(code, heart)
- }
- }
- //任务
- func (s *Spider) StartJob() {
- s.Stop = false
- s.Pass = false
- s.RoundCount++
- go s.ExecJob(false)
- }
- //单次执行
- func (s *Spider) ExecJob(reload bool) {
- defer func() {
- size_ok, size_no := 0, 0
- size_no_index := []interface{}{}
- LoopListPath.Range(func(k, v interface{}) bool {
- if v != nil {
- size_ok++
- } else {
- size_no_index = append(size_no_index, k)
- size_no++
- }
- return true
- })
- logger.Debug("index_", s.Index, ",", s.Code, s.Name, "ok,本轮下载量:", s.LastDowncount, ",轮询数据长度:", size_ok, ",废弃数量:", size_no, ",废弃位置:", size_no_index)
- s.ExecuteOkTime = time.Now().Unix()
- util.TimeSleepFunc(5*time.Second, TimeSleepChan)
- if util.Config.Working == 1 {
- s.Stop = true
- if _, b := Allspiders.Load(s.Code); b {
- Allspiders.Store(s.Code, s)
- }
- s.L.Close()
- CC <- s.L
- }
- }()
- if reload && util.Config.Working == 0 { //高效模式,轮询调度时重载脚本
- s.LoadScript(s.Name, s.Channel, s.MUserName, s.Code, s.ScriptFile, true)
- }
- logger.Debug(s.Code, s.Name, "频率:", s.SpiderRunRate, ",", s.Timeout)
- s.LastDowncount = 0
- s.LastExecTime = time.Now().Unix()
- s.LastHeartbeat = time.Now().Unix()
- s.ExecuteOkTime = 0
- err := s.GetLastPublishTime() //获取最新时间--作为最后更新时间
- if err != nil {
- logger.Error(s.Code, err)
- }
- err = s.DownListPageItem() //下载列表
- if err != nil {
- logger.Error(s.Code, err)
- }
- if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //判断爬虫是增量还是历史爬虫(目前只会在7000节点上有历史爬虫)
- UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
- SpiderCodeSendToEditor(s.Code) //发送编辑器
- return
- } else {
- if util.Config.Working == 0 { //高性能模式
- /*
- for !s.Stop && s.Pass {
- util.TimeSleepFunc(2*time.Second, TimeSleepChan)
- }
- if s.Stop {
- return
- }
- */
- //if s.IsMustDownload { //历史数据下载,只跑一轮
- if s.IsHistoricalMend && util.Config.IsHistoryEvent { //历史节点7000,高性能模式,历史补漏只下载一轮
- UpdateSpiderByCodeState(s.Code, "6") //爬虫在该节点下架
- b := mgu.Update("luaconfig", "editor", "editor", map[string]interface{}{"code": s.Code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false)
- logger.Info("Delete History Code:", s.Code, b)
- } else {
- if !s.Stop { //未下架定时执行
- util.TimeAfterFunc(time.Duration(s.SpiderRunRate)*time.Minute, func() {
- s.ExecJob(true)
- }, TimeChan)
- // util.TimeAfterFunc(30*time.Second, func() {
- // s.ExecJob(true)
- // }, TimeChan)
- } else { //下架后子线程退出
- return
- }
- }
- } else { //排队模式
- return
- }
- }
- }
- //获取最新时间--作为最后更新时间
- func (s *Spider) GetLastPublishTime() (errs interface{}) {
- defer mu.Catch()
- var lastpublishtime string
- //取得最后更新时间
- if err := s.L.CallByParam(lua.P{
- Fn: s.L.GetGlobal("getLastPublishTime"),
- NRet: 1,
- Protect: true,
- }); err != nil {
- //panic(s.Code + "," + err.Error())
- log.Println(s.Code + "," + err.Error())
- errs = err.Error()
- atomic.AddInt32(&s.Script.ErrorNum, 1)
- return errs
- }
- ret := s.L.Get(-1)
- s.L.Pop(1)
- if str, ok := ret.(lua.LString); ok {
- lastpublishtime = string(str)
- }
- if s.LastPubshTime < util.ParseDate2Int64(lastpublishtime) {
- //防止发布时间超前
- if util.ParseDate2Int64(lastpublishtime) > time.Now().Unix() {
- s.LastPubshTime = time.Now().Unix()
- } else {
- s.LastPubshTime = util.ParseDate2Int64(lastpublishtime)
- }
- }
- return nil
- }
- //下载列表
- func (s *Spider) DownListPageItem() (errs interface{}) {
- defer mu.Catch()
- start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页
- tmpMax := max //临时记录最大页
- repeatAllNum := 0 //本轮采集tmpMax页总的重复个数
- downloadAllNum := 0 //本轮采集tmpMax页总个数
- if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" { //7000节点,爬虫跑历史
- max = s.GetIntVar("spiderHistoryMaxPage")
- }
- downtimes := 0 //记录某页重试次数(暂定3次)
- repeatPageNum := 0 //记录列表页所有连接重复的页码
- repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页)
- isRunRepeatList := false //是否执行列表页连续判重
- if util.Config.Modal == 1 && util.Config.Working == 0 && max > 1 && max < 101 { //7100 7400最大页小于101且大于1,对此部分爬虫采集列表页时进行连续5页判重
- isRunRepeatList = true
- max = 100 //设置最大页为100
- }
- for ; start <= max && !s.Stop; start++ {
- if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list") //记录所有节点列表页心跳
- }
- //qu.Debug("重复页:", repeatPageNum, " 配置最大页:", tmpMax, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes)
- //if start > tmpMax && isRunRepeatList && repeatPageTimes >= 5 { //重复次数超过5次,不再翻页
- // break
- //}
- if isRunRepeatList && repeatPageTimes >= 10 { //重复次数超过10次,不再翻页
- break
- }
- if err := s.L.CallByParam(lua.P{
- Fn: s.L.GetGlobal("downloadAndParseListPage"),
- NRet: 1,
- Protect: true,
- }, lua.LNumber(start)); err != nil {
- //panic(s.Code + "," + err.Error())
- logger.Error("列表页采集报错", start, s.Code+","+err.Error())
- errs = err.Error()
- atomic.AddInt32(&s.Script.ErrorNum, 1)
- //列表页采集报错进行重试,超过重试次数视为该页已采
- if downtimes < 2 {
- downtimes++
- start--
- //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
- } else if isRunRepeatList { //超过重试次数,视为本页重复
- if repeatPageNum+1 == start {
- repeatPageTimes++ //次数加1
- } else {
- repeatPageTimes = 0 //重复次数重置0
- }
- repeatPageNum = start //赋值页码
- downtimes = 0
- }
- continue
- }
- lv := s.L.Get(-1)
- s.L.Pop(1)
- if tbl, ok := lv.(*lua.LTable); ok {
- list := []map[string]interface{}{}
- //qu.Debug("当前页数据量:", tbl.Len())
- if tabLen := tbl.Len(); tabLen > 0 { //列表页有数据,根据列表页信息下载三级页
- repeatListNum := 0 // 当前列表页连接重复个数
- for i := 1; i <= tabLen; i++ {
- v := tbl.RawGetInt(i).(*lua.LTable)
- tmp := util.TableToMap(v)
- //s.ThisSiteData(tmp) //统计当前下载数据是否是本站点数据
- if !s.IsHistoricalMend { //不是历史补漏
- tmp["dataging"] = 0 //数据中打标记dataging=0
- if s.DownDetail {
- s.DownloadDetailItem(tmp, &repeatListNum)
- } else {
- tmp["comeintime"] = time.Now().Unix()
- atomic.AddInt32(&s.LastDowncount, 1)
- atomic.AddInt32(&s.TodayDowncount, 1)
- atomic.AddInt32(&s.TotalDowncount, 1)
- href := fmt.Sprint(tmp["href"])
- if len(href) > 5 { //有效数据
- db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
- hashHref := HexText(href)
- //增量(redis默认db0)
- util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
- //全量(判断是否已存在防止覆盖id)
- isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
- if !isExist {
- util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
- }
- list = append(list, tmp)
- }
- }
- } else { //历史补漏
- s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页
- }
- }
- //if start <= tmpMax { //数量赋值
- repeatAllNum += repeatListNum
- downloadAllNum += tabLen
- //}
- //if start > tmpMax && isRunRepeatList { //执行连续页码判重
- if isRunRepeatList { //执行连续页码判重
- if repeatListNum >= tabLen { //当前start列表页全部数据都已采集
- //qu.Debug("重复页:", repeatPageNum, "当前页:", start)
- if repeatPageNum+1 == start || repeatPageNum == 0 {
- repeatPageTimes++ //次数加1
- } else {
- repeatPageTimes = 0 //重复次数重置0
- }
- repeatPageNum = start //赋值页码
- } else { //当前start页有遗漏数据
- repeatPageTimes = 0
- repeatPageNum = 0
- }
- }
- if !s.IsHistoricalMend && !s.DownDetail {
- if len(list) > 0 { //保存信息入库
- StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list)
- }
- }
- } else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页
- if downtimes < 2 {
- downtimes++
- start--
- continue
- //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
- } else if isRunRepeatList { //超过重试次数,视为本页重复
- if repeatPageNum+1 == start {
- repeatPageTimes++ //次数加1
- } else {
- repeatPageTimes = 0 //重复次数重置0
- }
- repeatPageNum = start //赋值页码
- }
- }
- } else { //请求当前列表页失败
- if downtimes < 2 {
- downtimes++
- start--
- continue
- //} else if start > tmpMax && isRunRepeatList { //超过重试次数,视为本页重复
- } else if isRunRepeatList { //超过重试次数,视为本页重复
- if repeatPageNum+1 == start {
- repeatPageTimes++ //次数加1
- } else {
- repeatPageTimes = 0 //重复次数重置0
- }
- repeatPageNum = start //赋值页码
- }
- }
- downtimes = 0 //当前页下载无误,重置下载重试次数
- util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
- }
- nowTime := time.Now()
- sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
- set := map[string]interface{}{
- "site": s.Name,
- "channel": s.Channel,
- "spidercode": s.Code,
- "updatetime": nowTime.Unix(),
- "event": util.Config.Uploadevent,
- "modifyuser": s.MUserName,
- "maxpage": tmpMax,
- "runrate": s.SpiderRunRate,
- "endpage": start,
- "date": sDate,
- }
- inc := map[string]interface{}{
- "alltimes": 1,
- }
- if downloadAllNum > 0 {
- rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum)
- rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64)
- if rate == 1.0 {
- inc["oh_percent"] = 1
- } else if rate >= 0.9 {
- inc["nt_percent"] = 1
- } else if rate >= 0.8 {
- inc["et_percent"] = 1
- } else {
- inc["other_percent"] = 1
- }
- } else {
- inc["zero"] = 1
- }
- query := map[string]interface{}{
- "date": sDate,
- "spidercode": s.Code,
- }
- logger.Info(s.Code, "本轮列表页采集详情:", downloadAllNum, repeatAllNum, start)
- Mgo.Update("spider_downloadrate", query, map[string]interface{}{
- "$set": set,
- "$inc": inc,
- }, true, false)
- return errs
- }
- func (s *Spider) ThisSiteData(tmp map[string]interface{}) {
- defer qu.Catch()
- href := qu.ObjToString(tmp["href"])
- url_dn := RepDomainNameReg.ReplaceAllString(DomainNameReg.FindString(s.TargetChannelUrl), "")
- href_dn := RepDomainNameReg.ReplaceAllString(DomainNameReg.FindString(href), "")
- if url_dn != href_dn {
- SaveMgoCache <- map[string]interface{}{
- "site": s.Name,
- "channel": s.Channel,
- "spidercode": s.Code,
- "url": s.TargetChannelUrl,
- "href": href,
- "modifyuser": s.MUserName,
- "comeintime": time.Now().Unix(),
- }
- }
- }
- //遍历,开启三级页下载(历史补漏)
- func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
- //qu.Debug("--------------历史下载-----------------")
- defer mu.Catch()
- var err interface{}
- data := map[string]interface{}{}
- paramdata := p.(map[string]interface{})
- for k, v := range paramdata {
- data[k] = v
- }
- href := qu.ObjToString(data["href"])
- if len(href) <= 5 { //无效数据
- return
- }
- db := HexToBigIntMod(href)
- hashHref := HexText(href)
- id := ""
- SaveListPageData(paramdata, &id, false) //存储采集记录
- isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref) //取全量redis
- //log.Println("full href:", href, " isExist:", isExist)
- logger.Debug("full href:", href, " isExist:", isExist)
- if !s.IsMustDownload && isExist { //非强制下载redis中存在,结束
- //qu.Debug("非强制下载redis中存在,结束")
- //更新spider_listdata中数据下载成功标记
- if id != "" {
- //Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "byid": id}}, false, true)
- Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}})
- }
- return
- }
- //qu.Debug("----------------下载、解析、入库--------------------")
- //下载、解析、入库
- data, err = s.DownloadDetailPage(paramdata, data)
- if err != nil || data == nil { //下载失败,结束
- if err != nil {
- logger.Error(s.Code, err, paramdata)
- // if len(paramdata) > 0 {
- // SaveErrorData(paramdata) //保存错误信息
- // }
- }
- //更新spider_listdata中数据下载失败标记
- if id != "" {
- Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}})
- }
- return
- }
- //更新spider_listdata中数据下载成功标记
- if id != "" {
- //Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "byid": id}}, false, true)
- Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}})
- }
- flag := true
- t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) //publishtime
- if s.IsMustDownload { //强制下载
- if isExist && t1 < time.Now().AddDate(0, 0, -5).Unix() {
- //qu.Debug("强制下载 redis存在")
- data["dataging"] = 1
- flag = false
- } else {
- //qu.Debug("强制下载 redis不存在")
- data["dataging"] = 0
- //WithinThreeDays(&data) //根据发布时间打标记
- }
- } else { //非强制下载
- if !isExist {
- //qu.Debug("非强制下载 redis不存在")
- data["dataging"] = 0
- //WithinThreeDays(&data) //根据发布时间打标记
- }
- }
- if t1 > time.Now().Unix() { //防止发布时间超前
- data["publishtime"] = time.Now().Unix()
- }
- delete(data, "state")
- delete(data, "exit")
- delete(data, "checkpublishtime")
- data["comeintime"] = time.Now().Unix()
- atomic.AddInt32(&s.LastDowncount, 1)
- atomic.AddInt32(&s.TodayDowncount, 1)
- atomic.AddInt32(&s.TotalDowncount, 1)
- data["spidercode"] = s.Code
- //qu.Debug("--------------开始保存---------------")
- data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
- Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, flag)
- //qu.Debug("--------------保存结束---------------")
- }
- //遍历,开启三级页下载(增量)
- func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
- defer mu.Catch()
- var err interface{}
- data := map[string]interface{}{}
- paramdata := p.(map[string]interface{})
- for k, v := range paramdata {
- data[k] = v
- }
- href := qu.ObjToString(data["href"])
- if len(href) <= 5 { //无效数据
- *num++ //视为已采集
- return
- }
- /*
- //查询增量redis查看信息是否已经下载
- isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
- if isExist { //更新redis生命周期
- util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
- *num++ //已采集
- return
- }
- log.Println("href had++:", isExist, href)
- */
- id := "" //记录spider_listdata中保存的数据id,便于下载成功后更新状态
- if util.Config.Modal == 1 { //除7000、7500、7700节点外所有节点只采集列表页信息
- isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
- if isExist { //更新redis生命周期
- util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
- *num++ //已采集
- return
- }
- SaveHighListPageData(paramdata, s.SCode, href, num)
- return
- } else {
- if !s.Stop {
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=0老模式采集三级页心跳
- }
- isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
- if isExist { //更新redis生命周期
- util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
- *num++ //已采集
- return
- }
- isEsRepeat := false
- if delayDay := DelaySites[s.Name]; delayDay > 0 { //类竞品站点爬虫title做es7天内判重检验(顺序采集无法延迟,只能判重)
- title := qu.ObjToString(paramdata["title"])
- eTime := time.Now().Unix()
- sTime := eTime - int64(7*86400)
- esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
- if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态
- isEsRepeat = true
- }
- }
- SaveListPageData(paramdata, &id, isEsRepeat) //保存7000、7410、7500、7700节点列表页采集的信息
- if isEsRepeat { //类竞品数据title判重数据加入redis
- util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
- return
- }
- }
- //下载、解析、入库
- data, err = s.DownloadDetailPage(paramdata, data)
- if err != nil || data == nil {
- if err != nil {
- logger.Error(s.Code, err, paramdata)
- if len(paramdata) > 0 {
- SaveErrorData(s.MUserName, paramdata, err) //保存错误信息
- }
- }
- //更新spider_listdata中数据下载失败标记
- if id != "" {
- Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}})
- }
- return
- } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
- log.Println("beforeHref:", href, "afterHref:", tmphref)
- //增量
- util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
- //全量
- db := HexToBigIntMod(href)
- hashHref := HexText(href)
- isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
- if !isExist {
- util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
- }
- }
- //更新spider_listdata中数据下载成功标记
- if id != "" {
- Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "byid": id}}, false, true)
- //Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}})
- }
- t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
- if t1 > time.Now().Unix() { //防止发布时间超前
- data["publishtime"] = time.Now().Unix()
- }
- if !s.Stop {
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=0老模式采集到数据心跳
- }
- delete(data, "state")
- delete(data, "exit")
- delete(data, "checkpublishtime")
- data["comeintime"] = time.Now().Unix()
- atomic.AddInt32(&s.LastDowncount, 1)
- atomic.AddInt32(&s.TodayDowncount, 1)
- atomic.AddInt32(&s.TotalDowncount, 1)
- data["spidercode"] = s.Code
- //qu.Debug("-----增量开始保存-----")
- // 临时保存数据
- // update := []map[string]interface{}{}
- // _id := data["_id"].(string)
- // update = append(update, map[string]interface{}{"_id": qu.StringTOBsonId(_id)})
- // update = append(update, map[string]interface{}{
- // "$set": map[string]interface{}{
- // "jsondata": data["jsondata"],
- // },
- // })
- // UpdataMgoCache <- update
- data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
- Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
- //qu.Debug("-----增量保存结束-----")
- }
- //遍历下载名录
- func (s *Spider) DownloadDetailByNames(p interface{}) {
- defer mu.Catch()
- var err interface{}
- /*
- if s.Stop {
- return
- }
- for s.Pass {
- util.TimeSleepFunc(2*time.Second, TimeSleepChan)
- }
- */
- //TODO 下载3级页,调用LUA分析;如果配置的不用下载3级页,就到此为止了,直接存储
- data := map[string]interface{}{}
- paramdata := p.(map[string]interface{})
- for k, v := range paramdata {
- data[k] = v
- }
- if s.DownDetail {
- href := qu.ObjToString(data["href"])
- if href == "" || len(href) < 5 { //无效数据
- return
- }
- //下载、解析、入库
- data, err = s.DownloadDetailPage(paramdata, data)
- if err != nil {
- logger.Error(s.Code, paramdata, err)
- return
- }
- }
- data["comeintime"] = time.Now().Unix()
- atomic.AddInt32(&s.LastDowncount, 1)
- atomic.AddInt32(&s.TodayDowncount, 1)
- atomic.AddInt32(&s.TotalDowncount, 1)
- Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
- }
- //下载解析内容页
- func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) {
- defer mu.Catch()
- s.LastHeartbeat = time.Now().Unix()
- util.TimeSleepFunc((time.Duration(s.SleepBase+GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan)
- tab := s.L.NewTable()
- for k, v := range param {
- if val, ok := v.(string); ok {
- tab.RawSet(lua.LString(k), lua.LString(val))
- } else if val, ok := v.(int64); ok {
- tab.RawSet(lua.LString(k), lua.LNumber(val))
- } else if val, ok := v.(int32); ok {
- tab.RawSet(lua.LString(k), lua.LNumber(val))
- } else if val, ok := v.(float64); ok {
- tab.RawSet(lua.LString(k), lua.LNumber(val))
- } else if val, ok := v.(float32); ok {
- tab.RawSet(lua.LString(k), lua.LNumber(val))
- } else if val, ok := v.(bool); ok {
- tab.RawSet(lua.LString(k), lua.LBool(val))
- }
- }
- var err error
- if err = s.L.CallByParam(lua.P{
- Fn: s.L.GetGlobal("downloadDetailPage"),
- NRet: 1,
- Protect: true,
- }, tab); err != nil {
- //panic(s.Code + "," + err.Error())
- log.Println(s.Code + "," + err.Error())
- atomic.AddInt32(&s.Script.ErrorNum, 1)
- return data, err
- }
- lv := s.L.Get(-1)
- s.L.Pop(1)
- //拼map
- if v3, ok := lv.(*lua.LTable); ok {
- v3.ForEach(func(k, v lua.LValue) {
- if tmp, ok := k.(lua.LString); ok {
- key := string(tmp)
- if value, ok := v.(lua.LString); ok {
- data[key] = string(value)
- } else if value, ok := v.(lua.LNumber); ok {
- data[key] = value
- } else if value, ok := v.(*lua.LTable); ok {
- tmp := util.TableToMap(value)
- data[key] = tmp
- }
- }
- })
- return data, err
- } else {
- return nil, err
- }
- }
- //高性能模式定时采集三级页信息
- func DetailData() {
- defer qu.Catch()
- <-InitAllLuaOver //脚本加载完毕,执行
- if util.Config.Working == 0 && !util.Config.IsHistoryEvent { //高性能模式且不是7000节点,只有7000节点util.Config.IsHistoryEvent为true
- GetListDataDownloadDetail()
- }
- }
- func GetListDataDownloadDetail() {
- defer qu.Catch()
- logger.Info("+++++++++++++++++++Download Detail+++++++++++++++++++")
- Allspiders2.Range(func(k, v interface{}) bool {
- sp := v.(*Spider)
- go sp.DownloadHighDetail()
- time.Sleep(2 * time.Second)
- return true
- })
- }
- //高性能模式根据列表页数据下载三级页
- func (s *Spider) DownloadHighDetail() {
- defer qu.Catch()
- for {
- logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
- if !s.Stop { //爬虫是运行状态
- comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累
- isEsRepeat := false //是否进行es判重
- if delayDay := DelaySites[s.Name]; delayDay > 0 {
- isEsRepeat = true
- if delayDay <= util.Config.DayNum*24 { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay小时采集(由于7410、7500、7700为顺序采集,无法延时)
- //comeintimeQuery["$lte"] = GetTime(-delayDay + 1)
- comeintimeQuery["$lte"] = time.Now().Unix() - int64(3600*delayDay)
- }
- }
- q := map[string]interface{}{
- "spidercode": s.Code,
- "state": 0, //0:入库状态;-1:采集失败;1:成功
- "comeintime": comeintimeQuery,
- }
- o := map[string]interface{}{"_id": -1}
- f := map[string]interface{}{
- "state": 0,
- "comeintime": 0,
- "event": 0,
- }
- if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳
- }
- list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 100)
- if list != nil && len(*list) > 0 {
- for _, tmp := range *list {
- _id := tmp["_id"]
- query := map[string]interface{}{"_id": _id}
- href := qu.ObjToString(tmp["href"])
- //由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在
- //为了避免重复下载,进行增量redis判重
- isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
- if isExist {
- set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
- Mgo.Update("spider_highlistdata", query, set, false, false)
- continue
- }
- if isEsRepeat { //es数据title判重
- title := qu.ObjToString(tmp["title"])
- eTime := time.Now().Unix()
- sTime := eTime - int64(7*86400)
- esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
- count := Es.Count(EsIndex, EsType, esQuery)
- if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
- set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
- Mgo.Update("spider_highlistdata", query, set, false, false)
- util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
- continue
- }
- }
- times := qu.IntAll(tmp["times"])
- success := true //数据是否下载成功的标志
- delete(tmp, "_id")
- delete(tmp, "times")
- data := map[string]interface{}{}
- var err interface{}
- for k, v := range tmp {
- data[k] = v
- }
- //下载、解析、入库
- data, err = s.DownloadDetailPage(tmp, data)
- if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳
- }
- if err != nil || data == nil {
- success = false
- times++
- if err != nil {
- logger.Error(s.Code, err, tmp)
- if len(tmp) > 0 {
- SaveErrorData(s.MUserName, tmp, err) //保存错误信息
- }
- } /*else if data == nil && times >= 3 { //下载问题,建editor任务
- DownloadErrorData(s.Code, tmp)
- }*/
- } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
- log.Println("beforeHref:", href, "afterHref:", tmphref)
- //增量
- util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
- //全量
- db := HexToBigIntMod(href)
- hashHref := HexText(href)
- isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
- if !isExist {
- util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
- }
- }
- if !success { //下载失败更新次数和状态
- ss := map[string]interface{}{"times": times}
- if times >= 3 { //3次下载失败今天不再下载,state置为1
- ss["state"] = -1
- }
- set := map[string]interface{}{"$set": ss}
- Mgo.Update("spider_highlistdata", query, set, false, false)
- continue
- }
- t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
- if t1 > time.Now().Unix() { //防止发布时间超前
- data["publishtime"] = time.Now().Unix()
- }
- delete(data, "exit")
- delete(data, "checkpublishtime")
- data["comeintime"] = time.Now().Unix()
- //计数
- tmpsp1, b := Allspiders.Load(s.Code)
- if b {
- sp1, ok := tmpsp1.(*Spider)
- if ok {
- atomic.AddInt32(&sp1.LastDowncount, 1)
- atomic.AddInt32(&sp1.TodayDowncount, 1)
- atomic.AddInt32(&sp1.TotalDowncount, 1)
- }
- }
- data["spidercode"] = s.Code
- data["dataging"] = 0
- data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
- Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
- set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1
- Mgo.Update("spider_highlistdata", query, set, false, false)
- }
- //重载spider
- s.LoadScript(s.Name, s.Channel, s.MUserName, s.Code, s.ScriptFile, true)
- } else { //没有数据
- time.Sleep(2 * time.Minute)
- }
- //s.GetListDataDownloadDetail() //开始下一轮
- } else {
- logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
- break
- }
- }
- }
- //队列模式根据列表页数据下载三级页
- func (s *Spider) DownloadListDetail() {
- defer qu.Catch()
- defer func() { //爬虫下载完三级页数据或无下载数据,使用后close
- s.Stop = true
- if _, b := Allspiders2.Load(s.Code); b {
- Allspiders2.Store(s.Code, s)
- }
- s.L.Close()
- CC2 <- s.L
- }()
- comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累
- isEsRepeat := false //是否进行es判重
- if delayDay := DelaySites[s.Name]; delayDay > 0 {
- isEsRepeat = true
- if delayDay <= util.Config.DayNum { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay天采集(由于7410、7500、7700为顺序采集,无法延时)
- //comeintimeQuery["$lte"] = GetTime(-delayDay + 1)
- comeintimeQuery["$lte"] = time.Now().Unix() - int64(86400*delayDay)
- }
- }
- q := map[string]interface{}{
- "spidercode": s.Code,
- "state": 0, //0:入库状态;-1:采集失败;1:成功
- "comeintime": comeintimeQuery,
- }
- o := map[string]interface{}{"_id": -1}
- f := map[string]interface{}{
- "state": 0,
- "comeintime": 0,
- "event": 0,
- }
- if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=1下载数据心跳
- }
- list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 100)
- if list != nil && len(*list) > 0 {
- for _, tmp := range *list {
- _id := tmp["_id"]
- query := map[string]interface{}{"_id": _id}
- href := qu.ObjToString(tmp["href"])
- //由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在
- //为了避免重复下载,进行增量redis判重
- isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
- if isExist {
- set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
- Mgo.Update("spider_highlistdata", query, set, false, false)
- continue
- }
- if isEsRepeat { //es数据title判重
- title := qu.ObjToString(tmp["title"])
- eTime := time.Now().Unix()
- sTime := eTime - int64(7*86400)
- esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
- if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态
- set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
- Mgo.Update("spider_highlistdata", query, set, false, false)
- util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
- continue
- }
- }
- times := qu.IntAll(tmp["times"])
- success := true //数据是否下载成功的标志
- delete(tmp, "_id")
- delete(tmp, "times")
- data := map[string]interface{}{}
- var err interface{}
- for k, v := range tmp {
- data[k] = v
- }
- //下载、解析、入库
- data, err = s.DownloadDetailPage(tmp, data)
- if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
- UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳
- }
- if err != nil || data == nil {
- success = false
- times++
- if err != nil {
- logger.Error(s.Code, err, tmp)
- if len(tmp) > 0 {
- SaveErrorData(s.MUserName, tmp, err) //保存错误信息
- }
- } /*else if data == nil && times >= 3 { //下载问题,建editor任务
- DownloadErrorData(s.Code, tmp)
- }*/
- } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
- log.Println("beforeHref:", href, "afterHref:", tmphref)
- //增量
- util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
- //全量
- db := HexToBigIntMod(href)
- hashHref := HexText(href)
- isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
- if !isExist {
- util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
- }
- }
- if !success { //下载失败更新次数和状态
- ss := map[string]interface{}{"times": times}
- if times >= 3 { //3次下载失败今天不再下载,state置为1
- ss["state"] = -1
- }
- set := map[string]interface{}{"$set": ss}
- Mgo.Update("spider_highlistdata", query, set, false, false)
- continue
- }
- t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
- if t1 > time.Now().Unix() { //防止发布时间超前
- data["publishtime"] = time.Now().Unix()
- }
- delete(data, "exit")
- delete(data, "checkpublishtime")
- data["comeintime"] = time.Now().Unix()
- //计数
- tmpsp1, b := Allspiders.Load(s.Code)
- if b {
- sp1, ok := tmpsp1.(*Spider)
- if ok {
- atomic.AddInt32(&sp1.LastDowncount, 1)
- atomic.AddInt32(&sp1.TodayDowncount, 1)
- atomic.AddInt32(&sp1.TotalDowncount, 1)
- }
- }
- data["spidercode"] = s.Code
- data["dataging"] = 0
- data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
- Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
- set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1
- Mgo.Update("spider_highlistdata", query, set, false, false)
- }
- }
- }
- //获取随机数
- func GetRandMath(num int) int {
- r := rand.New(rand.NewSource(time.Now().UnixNano()))
- return r.Intn(num)
- }
- //获取hascode
- func GetHas1(data string) string {
- t := sha1.New()
- io.WriteString(t, data)
- hf := Reg.FindString(data)
- if !strings.HasSuffix(hf, "/") {
- hf = hf + "/"
- }
- return hf + fmt.Sprintf("%x", t.Sum(nil))
- }
- //对href哈希取模
- func HexToBigIntMod(href string) int {
- //取哈希值
- t := sha256.New()
- io.WriteString(t, href)
- hex := fmt.Sprintf("%x", t.Sum(nil))
- //取模
- n := new(big.Int)
- n, _ = n.SetString(hex[2:], 16)
- return int(n.Mod(n, big.NewInt(16)).Int64())
- }
- //求hash
- func HexText(href string) string {
- h := sha256.New()
- h.Write([]byte(href))
- return fmt.Sprintf("%x", h.Sum(nil))
- }
- //func RedisIsExist(href string) bool {
- // isExist := false
- // if len(href) > 75 { //取href的哈希判断是否存在
- // hashHref := GetHas1(href)
- // isExist, _ = util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+hashHref)
- // }
- // if !isExist { //取string href判断是否存在
- // isExist, _ = util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
- // }
- // return isExist
- //}
- //判断发布时间是否在三天内
- //func WithinThreeDays(data *map[string]interface{}) {
- // withinThreeDays := false
- // //根据发布时间打标记
- // publishtime := util.ParseDate2Int64(qu.ObjToString((*data)["publishtime"])) //没有发布时间,取当前时间
- // //发布时间
- // now := time.Now().Unix()
- // if now-publishtime > 259200 { //三天前数据
- // withinThreeDays = false
- // } else {
- // withinThreeDays = true
- // }
- // if withinThreeDays {
- // //qu.Debug("发布时间在三天内")
- // (*data)["dataging"] = 0
- // } else {
- // //qu.Debug("发布时间在三天外")
- // (*data)["dataging"] = 1
- // }
- //}
|