package spider import ( "fmt" "github.com/donnie4w/go-logger/logger" qu "qfw/util" sputil "spiderutil" "sync" "time" ) var HistoryAllSpiders sync.Map = sync.Map{} //历史节点下载详情页 func HistoryEventDownloadDetail() { defer qu.Catch() if !sputil.Config.IsHistoryEvent { //不是历史节点return return } GetHistoryDownloadSpider() //定时检测数据集汇总爬虫 } //执行爬虫 func (s *Spider) StartSpider() { defer qu.Catch() for { logger.Info("Running Code:", s.Code, "Stop:", s.Stop) if !s.Stop { //爬虫是运行状态 //s.DownloadHistoryDetail() s.DownloadDetail(true, true) } else { //爬虫停止运行,删除 s.L.Close() HistoryAllSpiders.Delete(s.Code) logger.Info("Delete Code:", s.Code, "Stop:", s.Stop) break } } } //定时检测数据集汇总爬虫 func GetHistoryDownloadSpider() { defer qu.Catch() logger.Info("定时统计下载历史爬虫开始...") sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) wg := &sync.WaitGroup{} ch := make(chan bool, 2) match := map[string]interface{}{ //查询未下载过的数据 "state": 0, } group := map[string]interface{}{ "_id": "$spidercode", "count": map[string]interface{}{ "$sum": 1, }, } p := []map[string]interface{}{ map[string]interface{}{"$match": match}, map[string]interface{}{"$group": group}, } it := sess.DB(MgoS.DbName).C("spider_historydata").Pipe(p).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["_id"]) count := qu.IntAll(tmp["count"]) logger.Info("code:", code, " 当前待采集历史信息量:", count) //查询爬虫信息 lua, _ := MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code}) if len(*lua) > 0 { state := qu.IntAll((*lua)["state"]) if state >= 7 || state == 4 { //已作废、无发布、需登录、无法处理、已删除、已上线(python)这些状态爬虫的数据不再下载 MgoS.Update("spider_historydata", map[string]interface{}{"spidercode": code, "state": 0}, map[string]interface{}{"$set": map[string]interface{}{"luastate": state, "state": -1, "updatetime": time.Now().Unix()}}, false, true, ) } else { old := qu.IntAll((*lua)["old_lua"]) script := "" if old == 1 { script = fmt.Sprint((*lua)["luacontent"]) } else { if (*lua)["oldlua"] != nil { if (*lua)["luacontent"] != nil { script = (*lua)["luacontent"].(string) } } else { script = GetScriptByTmp((*lua)) } } spTmp, b := HistoryAllSpiders.Load(code) isNew := true if b { //更新正在运行爬虫信息 sp, ok := spTmp.(*Spider) if ok { sp.ScriptFile = script sp.UserName = qu.ObjToString((*lua)["createuser"]) sp.UserEmail = qu.ObjToString((*lua)["createuseremail"]) sp.MUserName = qu.ObjToString((*lua)["modifyuser"]) sp.MUserEmail = qu.ObjToString((*lua)["next"]) isNew = false } } if isNew { sp, errstr := CreateSpider(code, script, true, false) if errstr == "" && sp != nil && sp.Code != "nil" { //脚本加载成功 sp.IsMainThread = true //多线程采集时,判断哪个是主线程,由主线程采集时更新心跳 HistoryAllSpiders.Store(code, sp) //存入集合 logger.Info("start job:", code) go sp.StartSpider() } else { logger.Info(code, "脚本加载失败,请检查!") nowT := time.Now().Unix() username := "异常" if sp != nil { username = sp.MUserName } MgoS.Update("spider_loadfail", map[string]interface{}{ "code": code, "modifytime": map[string]interface{}{ "$gte": nowT - 12*3600, "$lte": nowT + 12*3600, }, }, map[string]interface{}{ "$set": map[string]interface{}{ "code": code, "type": "初始化", "script": script, "updatetime": nowT, "modifyuser": username, "event": sputil.Config.Uploadevent, "err": errstr, }, }, true, false) } } } } }(tmp) tmp = map[string]interface{}{} } wg.Wait() //time.AfterFunc(time.Second*30, GetHistoryDownloadSpider) time.AfterFunc(time.Minute*30, GetHistoryDownloadSpider) }