123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- package spider
- import (
- "fmt"
- "github.com/donnie4w/go-logger/logger"
- qu "qfw/util"
- sputil "spiderutil"
- "sync"
- "time"
- )
- var HistoryAllSpiders sync.Map = sync.Map{}
- //历史节点下载详情页
- func HistoryEventDownloadDetail() {
- defer qu.Catch()
- if !sputil.Config.IsHistoryEvent { //不是历史节点return
- return
- }
- GetHistoryDownloadSpider() //定时检测数据集汇总爬虫
- }
- //执行爬虫
- func (s *Spider) StartSpider() {
- defer qu.Catch()
- for {
- logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
- if !s.Stop { //爬虫是运行状态
- //s.DownloadHistoryDetail()
- s.DownloadDetail(true, true)
- } else { //爬虫停止运行,删除
- s.L.Close()
- HistoryAllSpiders.Delete(s.Code)
- logger.Info("Delete Code:", s.Code, "Stop:", s.Stop)
- break
- }
- }
- }
- //加载应采集数据,进行采集
- func (s *Spider) DownloadHistoryDetail() {
- defer qu.Catch()
- q := map[string]interface{}{"spidercode": s.Code, "state": 0}
- o := map[string]interface{}{"_id": 1}
- f := map[string]interface{}{
- "state": 0,
- "comeintime": 0,
- "event": 0,
- }
- //UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录采集三级页心跳
- list, _ := MgoS.Find("spider_historydata", q, o, f, false, 0, 200)
- if len(*list) == 0 { //数据量为0,表示无可下载数据,爬虫作废
- s.Stop = true
- return
- }
- //采集(目前未开多线程)
- for _, tmp := range *list {
- id := tmp["_id"]
- href := qu.ObjToString(tmp["href"])
- hashHref := HexText(href)
- isExist := sputil.RedisClusterExists(hashHref) //全量href redis判重
- if isExist {
- set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "href", "updatetime": time.Now().Unix()}} //已存在state置为1
- MgoS.UpdateById("spider_historydata", id, set)
- return
- }
- success := true //数据是否下载成功的标志
- delete(tmp, "_id") //删除列表页信息无用字段_id
- data := map[string]interface{}{}
- for k, v := range tmp {
- data[k] = v
- }
- //下载、解析、入库
- data, err := s.DownloadDetailPage(tmp, data)
- //UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //下载数据心跳
- if err != nil || data == nil {
- success = false
- if err != nil {
- logger.Error(s.Code, err, tmp)
- //if len(tmp) > 0 {
- // SaveErrorData(s.MUserName, tmp, err) //保存错误信息
- //}
- } /*else if data == nil && times >= 3 { //下载问题,建editor任务
- DownloadErrorData(s.Code, tmp)
- }*/
- } else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
- sputil.RedisClusterSet(hashHref, "", -1)
- }
- if !success { //下载失败
- set := map[string]interface{}{"$set": map[string]interface{}{"state": -1, "updatetime": time.Now().Unix()}}
- MgoS.UpdateById("spider_historydata", id, set)
- return
- } else if data["delete"] != nil { //三级页过滤
- sputil.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
- //更新mgo 要删除的数据更新spider_historydata state=1不再下载,更新redis
- set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "delete": true, "updatetime": time.Now().Unix()}}
- MgoS.UpdateById("spider_historydata", id, set)
- return
- }
- //正文、附件分析,下载异常数据重新下载
- if AnalysisProjectInfo(data) {
- set := map[string]interface{}{"$set": map[string]interface{}{"state": -1, "detailfilerr": true, "updatetime": time.Now().Unix()}}
- MgoS.UpdateById("spider_historydata", id, set)
- return
- }
- t1 := sputil.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
- if t1 > time.Now().Unix() { //防止发布时间超前
- data["publishtime"] = time.Now().Unix()
- }
- delete(data, "exit")
- delete(data, "checkpublishtime")
- data["comeintime"] = time.Now().Unix()
- data["spidercode"] = s.Code
- data["dataging"] = 0
- data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
- //发送保存服务
- Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
- set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}} //下载成功state置为1
- MgoS.UpdateById("spider_historydata", id, set)
- }
- //采集完LoadScript
- s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
- }
- //定时检测数据集汇总爬虫
- func GetHistoryDownloadSpider() {
- defer qu.Catch()
- logger.Info("定时统计下载历史爬虫开始...")
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 2)
- match := map[string]interface{}{ //查询未下载过的数据
- "state": 0,
- }
- group := map[string]interface{}{
- "_id": "$spidercode",
- "count": map[string]interface{}{
- "$sum": 1,
- },
- }
- p := []map[string]interface{}{
- map[string]interface{}{"$match": match},
- map[string]interface{}{"$group": group},
- }
- it := sess.DB(MgoS.DbName).C("spider_historydata").Pipe(p).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["_id"])
- count := qu.IntAll(tmp["count"])
- logger.Info("code:", code, " 当前待采集历史信息量:", count)
- //查询爬虫信息
- lua, _ := MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code})
- if len(*lua) > 0 {
- state := qu.IntAll((*lua)["state"])
- if state >= 7 || state == 4 { //已作废、无发布、需登录、无法处理、已删除、已上线(python)这些状态爬虫的数据不再下载
- MgoS.Update("spider_historydata",
- map[string]interface{}{"spidercode": code, "state": 0},
- map[string]interface{}{"$set": map[string]interface{}{"luastate": state, "state": -1, "updatetime": time.Now().Unix()}},
- false, true,
- )
- } else {
- old := qu.IntAll((*lua)["old_lua"])
- script := ""
- if old == 1 {
- script = fmt.Sprint((*lua)["luacontent"])
- } else {
- if (*lua)["oldlua"] != nil {
- if (*lua)["luacontent"] != nil {
- script = (*lua)["luacontent"].(string)
- }
- } else {
- script = GetScriptByTmp((*lua))
- }
- }
- spTmp, b := HistoryAllSpiders.Load(code)
- isNew := true
- if b { //更新正在运行爬虫信息
- sp, ok := spTmp.(*Spider)
- if ok {
- sp.ScriptFile = script
- sp.UserName = qu.ObjToString((*lua)["createuser"])
- sp.UserEmail = qu.ObjToString((*lua)["createuseremail"])
- sp.MUserName = qu.ObjToString((*lua)["modifyuser"])
- sp.MUserEmail = qu.ObjToString((*lua)["next"])
- isNew = false
- }
- }
- if isNew {
- sp, errstr := CreateSpider(code, script, true, false)
- if errstr == "" && sp != nil && sp.Code != "nil" { //脚本加载成功
- sp.IsMainThread = true //多线程采集时,判断哪个是主线程,由主线程采集时更新心跳
- HistoryAllSpiders.Store(code, sp) //存入集合
- logger.Info("start job:", code)
- go sp.StartSpider()
- } else {
- logger.Info(code, "脚本加载失败,请检查!")
- nowT := time.Now().Unix()
- username := "异常"
- if sp != nil {
- username = sp.MUserName
- }
- MgoS.Update("spider_loadfail",
- map[string]interface{}{
- "code": code,
- "modifytime": map[string]interface{}{
- "$gte": nowT - 12*3600,
- "$lte": nowT + 12*3600,
- },
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "code": code,
- "type": "初始化",
- "script": script,
- "updatetime": nowT,
- "modifyuser": username,
- "event": sputil.Config.Uploadevent,
- "err": errstr,
- },
- }, true, false)
- }
- }
- }
- }
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- //time.AfterFunc(time.Second*30, GetHistoryDownloadSpider)
- time.AfterFunc(time.Minute*30, GetHistoryDownloadSpider)
- }
|