|
@@ -78,11 +78,12 @@ type Spider struct {
|
|
|
MUserName, MUserEmail string //维护人,维护人邮箱
|
|
|
//Index int //数组索引
|
|
|
//历史补漏
|
|
|
- IsHistoricalMend bool //是否是历史补漏爬虫
|
|
|
- IsMustDownload bool //是否强制下载
|
|
|
- IsCompete bool //区分新老爬虫
|
|
|
- Infoformat int //区分爬虫类型 1:招标;2:拟建/审批;3:产权
|
|
|
- IsMainThread bool //是否为主线程(多线程采集时区分是否为主线程)
|
|
|
+ IsHistoricalMend bool //是否是历史补漏爬虫
|
|
|
+ IsMustDownload bool //是否强制下载
|
|
|
+ IsCompete bool //区分新老爬虫
|
|
|
+ Infoformat int //区分爬虫类型 1:招标;2:拟建/审批;3:产权
|
|
|
+ IsMainThread bool //是否为主线程(多线程采集时区分是否为主线程)
|
|
|
+ ListParallelTaskNum int //列表页爬虫执行任务并行数量
|
|
|
}
|
|
|
|
|
|
var (
|
|
@@ -118,45 +119,6 @@ type DelaySite struct {
|
|
|
Compete bool
|
|
|
}
|
|
|
|
|
|
-//心跳
|
|
|
-func UpdateHeart(site, channel, code, user, t string, firstpage bool) {
|
|
|
- //sp, spiderOk := LoopListPath.Load(code)
|
|
|
- //if spiderOk && sp != nil {
|
|
|
- if htmp, ok := SpiderHeart.Load(code); ok {
|
|
|
- if heart, ok := htmp.(*Heart); ok {
|
|
|
- if t == "list" {
|
|
|
- heart.ListHeart = time.Now().Unix()
|
|
|
- if firstpage {
|
|
|
- heart.FirstPageHeart = time.Now().Unix()
|
|
|
- }
|
|
|
- } else if t == "findlist" {
|
|
|
- heart.FindListHeart = time.Now().Unix()
|
|
|
- } else if t == "detail" {
|
|
|
- heart.DetailHeart = time.Now().Unix()
|
|
|
- } else if t == "detailexcute" {
|
|
|
- heart.DetailExecuteHeart = time.Now().Unix()
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- heart := &Heart{
|
|
|
- ModifyUser: user,
|
|
|
- Site: site,
|
|
|
- Channel: channel,
|
|
|
- }
|
|
|
- if t == "list" {
|
|
|
- heart.ListHeart = time.Now().Unix()
|
|
|
- } else if t == "findlist" {
|
|
|
- heart.FindListHeart = time.Now().Unix()
|
|
|
- } else if t == "detail" {
|
|
|
- heart.DetailHeart = time.Now().Unix()
|
|
|
- } else if t == "detailexcute" {
|
|
|
- heart.DetailExecuteHeart = time.Now().Unix()
|
|
|
- }
|
|
|
- SpiderHeart.Store(code, heart)
|
|
|
- }
|
|
|
- //}
|
|
|
-}
|
|
|
-
|
|
|
//任务
|
|
|
func (s *Spider) StartJob() {
|
|
|
s.Stop = false
|
|
@@ -168,18 +130,6 @@ func (s *Spider) StartJob() {
|
|
|
//单次执行
|
|
|
func (s *Spider) ExecJob(reload bool) {
|
|
|
defer func() {
|
|
|
- size_ok, size_no := 0, 0
|
|
|
- size_no_index := []interface{}{}
|
|
|
- LoopListPath.Range(func(k, v interface{}) bool {
|
|
|
- if v != nil {
|
|
|
- size_ok++
|
|
|
- } else {
|
|
|
- size_no_index = append(size_no_index, k)
|
|
|
- size_no++
|
|
|
- }
|
|
|
- return true
|
|
|
- })
|
|
|
- logger.Debug(s.Code, s.Name, "ok,本轮下载量:", s.LastDowncount, ",轮询数据长度:", size_ok, ",下线数量:", size_no, ",下线爬虫:", size_no_index)
|
|
|
s.ExecuteOkTime = time.Now().Unix()
|
|
|
util.TimeSleepFunc(5*time.Second, TimeSleepChan)
|
|
|
if util.Config.Working == 1 {
|
|
@@ -187,11 +137,13 @@ func (s *Spider) ExecJob(reload bool) {
|
|
|
if _, b := Allspiders.Load(s.Code); b {
|
|
|
Allspiders.Store(s.Code, s)
|
|
|
}
|
|
|
+ //资源释放(队列模式下架操作时,不操作资源释放,执行完后自动释放)
|
|
|
s.L.Close()
|
|
|
CC <- s.L
|
|
|
}
|
|
|
}()
|
|
|
- if reload && util.Config.Working == 0 { //高效模式,轮询调度时重载脚本
|
|
|
+ //高效模式,轮询调度时重载脚本
|
|
|
+ if reload && util.Config.Working == 0 {
|
|
|
s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
|
|
|
}
|
|
|
logger.Debug(s.Code, s.Name, "频率:", s.SpiderRunRate, ",", s.Timeout)
|
|
@@ -199,11 +151,11 @@ func (s *Spider) ExecJob(reload bool) {
|
|
|
s.LastExecTime = time.Now().Unix()
|
|
|
s.LastHeartbeat = time.Now().Unix()
|
|
|
s.ExecuteOkTime = 0
|
|
|
- err := s.GetLastPublishTime() //获取最新时间--作为最后更新时间
|
|
|
- if err != nil {
|
|
|
- logger.Error(s.Code, err)
|
|
|
- }
|
|
|
- err = s.DownListPageItem() //下载列表
|
|
|
+ //err := s.GetLastPublishTime() //获取最新时间--作为最后更新时间
|
|
|
+ //if err != nil {
|
|
|
+ // logger.Error(s.Code, err)
|
|
|
+ //}
|
|
|
+ err := s.DownListPageItem() //下载列表
|
|
|
if err != nil {
|
|
|
logger.Error(s.Code, err)
|
|
|
}
|
|
@@ -231,9 +183,6 @@ func (s *Spider) ExecJob(reload bool) {
|
|
|
util.TimeAfterFunc(time.Duration(s.SpiderRunRate)*time.Minute, func() {
|
|
|
s.ExecJob(true)
|
|
|
}, TimeChan)
|
|
|
- // util.TimeAfterFunc(30*time.Second, func() {
|
|
|
- // s.ExecJob(true)
|
|
|
- // }, TimeChan)
|
|
|
} else { //下架后子线程退出
|
|
|
return
|
|
|
}
|
|
@@ -291,21 +240,30 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
|
|
|
downtimes := 0 //记录某页重试次数(暂定3次)
|
|
|
repeatPageNum := 0 //记录列表页所有连接重复的页码
|
|
|
repeatPageTimes := 0 //记录页码连续判重的次数(暂定连续判重页码数为5次时,不再翻页)
|
|
|
- repeatPageTimesLimit := 10 //记录页码连续判重的次数上线(高性能模式10页,队列模式5页)
|
|
|
+ repeatPageTimesLimit := util.Config.PageTurnInfo.RepeatPageTimesLimitW0 //记录页码连续判重的次数上线(高性能模式10页,队列模式5页)
|
|
|
isRunRepeatList := false //是否执行列表页连续判重
|
|
|
if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重
|
|
|
isRunRepeatList = true
|
|
|
- max = 100 //高性能模式设置最大页为100
|
|
|
+ max = util.Config.PageTurnInfo.TurnPageMaxLimitW0 //高性能模式设置最大页为100
|
|
|
+ if util.Config.Working == 1 { //队列模式
|
|
|
+ repeatPageTimesLimit = util.Config.PageTurnInfo.RepeatPageTimesLimitW1 //连续判重页3
|
|
|
+ max = util.Config.PageTurnInfo.TurnPageMaxLimitW1 //队列模式最大页50
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //子任务判断
|
|
|
+ if s.ContinueDownListChildTask {
|
|
|
+ start = util.Config.PageTurnInfo.TurnPageMaxLimitW0 + 1
|
|
|
+ max = util.Config.PageTurnInfo.TurnPageMaxLimitW0 + util.Config.PageTurnInfo.NextPageMaxLimitW0
|
|
|
if util.Config.Working == 1 { //队列模式
|
|
|
- repeatPageTimesLimit = 3 //连续判重页3
|
|
|
- max = 50 //队列模式最大页50
|
|
|
+ start = util.Config.PageTurnInfo.TurnPageMaxLimitW1 + 1
|
|
|
+ max = util.Config.PageTurnInfo.TurnPageMaxLimitW1 + util.Config.PageTurnInfo.NextPageMaxLimitW1
|
|
|
}
|
|
|
}
|
|
|
for ; start <= max && !s.Stop; start++ {
|
|
|
- if !s.Stop { //在下载详情页时爬虫下架,此时不再存心跳信息
|
|
|
+ if !s.Stop && !s.ContinueDownListChildTask { //在下载详情页时爬虫下架,此时不再存心跳信息
|
|
|
UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list", start == 1) //记录所有节点列表页心跳
|
|
|
}
|
|
|
- //logger.Info("爬虫:", s.Code, "重复页:", repeatPageNum, " 配置最大页:", tmpMax, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes)
|
|
|
+ //qu.Debug("爬虫:", s.Code, "重复页:", repeatPageNum, " 配置最大页:", s.MaxPage, " 最终最大页:", max, " 当前页:", start, "重复次数:", repeatPageTimes)
|
|
|
//if start > tmpMax && isRunRepeatList && repeatPageTimes >= 5 { //重复次数超过5次,不再翻页
|
|
|
// break
|
|
|
//}
|
|
@@ -425,7 +383,7 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
|
|
|
downtimes = 0 //当前页下载无误,重置下载重试次数
|
|
|
util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
|
|
|
}
|
|
|
- logger.Info(s.Code, "本轮列表页采集详情:", downloadAllNum, repeatAllNum, start, s.Stop)
|
|
|
+ logger.Info(s.Code, "本轮列表页采集详情:", s.ContinueDownListChildTask, downloadAllNum, repeatAllNum, start, s.Stop)
|
|
|
if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率
|
|
|
nowTime := time.Now()
|
|
|
sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
|
|
@@ -475,7 +433,11 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
|
|
|
inc["other_percent"] = 1
|
|
|
}
|
|
|
if isRunRepeatList && start > max { //连续翻页超过了上限
|
|
|
- inc["uplimit"] = 1
|
|
|
+ if !s.ContinueDownListChildTask {
|
|
|
+ go ContinueDownListPageItem(s) //开启子任务继续采集
|
|
|
+ } else {
|
|
|
+ inc["uplimit"] = 1
|
|
|
+ }
|
|
|
}
|
|
|
} else {
|
|
|
inc["zero"] = 1
|
|
@@ -487,7 +449,11 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
|
|
|
"date": sDate,
|
|
|
"spidercode": s.Code,
|
|
|
}
|
|
|
- MgoS.Update("spider_downloadrate", query, map[string]interface{}{
|
|
|
+ coll := "spider_downloadrate"
|
|
|
+ if s.ContinueDownListChildTask {
|
|
|
+ coll = "spider_downloadrate_child"
|
|
|
+ }
|
|
|
+ MgoS.Update(coll, query, map[string]interface{}{
|
|
|
"$set": set,
|
|
|
"$inc": inc,
|
|
|
}, true, false)
|
|
@@ -499,6 +465,26 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
|
|
|
return errs
|
|
|
}
|
|
|
|
|
|
+//开启单独线程继续采集列表页
|
|
|
+func ContinueDownListPageItem(s *Spider) {
|
|
|
+ defer qu.Catch()
|
|
|
+ spTmp, errstr := CreateSpider(s.SCode, s.ScriptFile, true, true) //生成新爬虫
|
|
|
+ logger.Info(s.SCode, "补充连续翻页开始...")
|
|
|
+ if errstr == "" && spTmp != nil && spTmp.Code != "nil" { //脚本加载成功
|
|
|
+ spTmp.UserName = s.UserName
|
|
|
+ spTmp.UserEmail = s.UserEmail
|
|
|
+ spTmp.MUserName = s.MUserName
|
|
|
+ spTmp.MUserEmail = s.MUserEmail
|
|
|
+ spTmp.ContinueDownListChildTask = true
|
|
|
+ defer spTmp.L.Close()
|
|
|
+ err := spTmp.DownListPageItem() //下载列表
|
|
|
+ logger.Info(s.SCode, "补充连续翻页结束...")
|
|
|
+ if err != nil {
|
|
|
+ logger.Error(spTmp.Code, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
//站点信息统计
|
|
|
//func (s *Spider) ThisSiteData(tmp map[string]interface{}) {
|
|
|
// defer qu.Catch()
|
|
@@ -708,7 +694,8 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
|
|
|
if util.Config.Uploadevent == 7410 || publishtime < time.Now().AddDate(-1, 0, 0).Unix() {
|
|
|
isExist, _ = util.ExistsBloomRedis("href", tmphref)
|
|
|
if isExist {
|
|
|
- MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "tmphref": tmphref, "updatetime": time.Now().Unix()}})
|
|
|
+ //MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "tmphref": tmphref, "updatetime": time.Now().Unix()}})
|
|
|
+ MgoS.Update("spider_listdata", map[string]interface{}{"href": tmphref}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "byid": id, "tmphref": tmphref, "updatetime": time.Now().Unix()}}, false, true)
|
|
|
return
|
|
|
}
|
|
|
}
|
|
@@ -1115,8 +1102,8 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
|
|
|
s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
|
|
|
}
|
|
|
}
|
|
|
- } else if reload { //高性能模式无数据sleep2分钟
|
|
|
- time.Sleep(2 * time.Minute)
|
|
|
+ } else if reload { //高性能模式无数据sleep
|
|
|
+ time.Sleep(30 * time.Second)
|
|
|
}
|
|
|
}
|
|
|
|