123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- package spider
- import (
- "fmt"
- "github.com/donnie4w/go-logger/logger"
- qu "qfw/util"
- sputil "spiderutil"
- "sync"
- "time"
- )
- var HistoryAllSpiders sync.Map = sync.Map{}
- //历史节点下载详情页
- func HistoryEventDownloadDetail() {
- defer qu.Catch()
- if !sputil.Config.IsHistoryEvent { //不是历史节点return
- return
- }
- GetHistoryDownloadSpider() //定时检测数据集汇总爬虫
- }
- //执行爬虫
- func (s *Spider) StartSpider() {
- defer qu.Catch()
- for {
- logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
- if !s.Stop { //爬虫是运行状态
- //s.DownloadHistoryDetail()
- s.DownloadDetail(true, true)
- } else { //爬虫停止运行,删除
- s.L.Close()
- HistoryAllSpiders.Delete(s.Code)
- logger.Info("Delete Code:", s.Code, "Stop:", s.Stop)
- break
- }
- }
- }
- //定时检测数据集汇总爬虫
- func GetHistoryDownloadSpider() {
- defer qu.Catch()
- logger.Info("定时统计下载历史爬虫开始...")
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- wg := &sync.WaitGroup{}
- ch := make(chan bool, 2)
- match := map[string]interface{}{ //查询未下载过的数据
- "state": 0,
- }
- group := map[string]interface{}{
- "_id": "$spidercode",
- "count": map[string]interface{}{
- "$sum": 1,
- },
- }
- p := []map[string]interface{}{
- map[string]interface{}{"$match": match},
- map[string]interface{}{"$group": group},
- }
- it := sess.DB(MgoS.DbName).C("spider_historydata").Pipe(p).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- wg.Add(1)
- ch <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["_id"])
- count := qu.IntAll(tmp["count"])
- logger.Info("code:", code, " 当前待采集历史信息量:", count)
- //查询爬虫信息
- lua, _ := MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code})
- if len(*lua) > 0 {
- state := qu.IntAll((*lua)["state"])
- if state >= 7 || state == 4 { //已作废、无发布、需登录、无法处理、已删除、已上线(python)这些状态爬虫的数据不再下载
- MgoS.Update("spider_historydata",
- map[string]interface{}{"spidercode": code, "state": 0},
- map[string]interface{}{"$set": map[string]interface{}{"luastate": state, "state": -1, "updatetime": time.Now().Unix()}},
- false, true,
- )
- } else {
- old := qu.IntAll((*lua)["old_lua"])
- script := ""
- if old == 1 {
- script = fmt.Sprint((*lua)["luacontent"])
- } else {
- if (*lua)["oldlua"] != nil {
- if (*lua)["luacontent"] != nil {
- script = (*lua)["luacontent"].(string)
- }
- } else {
- script = GetScriptByTmp((*lua))
- }
- }
- spTmp, b := HistoryAllSpiders.Load(code)
- isNew := true
- if b { //更新正在运行爬虫信息
- sp, ok := spTmp.(*Spider)
- if ok {
- sp.ScriptFile = script
- sp.UserName = qu.ObjToString((*lua)["createuser"])
- sp.UserEmail = qu.ObjToString((*lua)["createuseremail"])
- sp.MUserName = qu.ObjToString((*lua)["modifyuser"])
- sp.MUserEmail = qu.ObjToString((*lua)["next"])
- isNew = false
- }
- }
- if isNew {
- sp, errstr := CreateSpider(code, script, true, false)
- if errstr == "" && sp != nil && sp.Code != "nil" { //脚本加载成功
- sp.IsMainThread = true //多线程采集时,判断哪个是主线程,由主线程采集时更新心跳
- HistoryAllSpiders.Store(code, sp) //存入集合
- logger.Info("start job:", code)
- go sp.StartSpider()
- } else {
- logger.Info(code, "脚本加载失败,请检查!")
- nowT := time.Now().Unix()
- username := "异常"
- if sp != nil {
- username = sp.MUserName
- }
- MgoS.Update("spider_loadfail",
- map[string]interface{}{
- "code": code,
- "modifytime": map[string]interface{}{
- "$gte": nowT - 12*3600,
- "$lte": nowT + 12*3600,
- },
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "code": code,
- "type": "初始化",
- "script": script,
- "updatetime": nowT,
- "modifyuser": username,
- "event": sputil.Config.Uploadevent,
- "err": errstr,
- },
- }, true, false)
- }
- }
- }
- }
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- //time.AfterFunc(time.Second*30, GetHistoryDownloadSpider)
- time.AfterFunc(time.Minute*30, GetHistoryDownloadSpider)
- }
|