package spider import ( "bufio" "encoding/json" "errors" "fmt" "github.com/cjoudrey/gluahttp" lujson "github.com/yuin/gopher-json" "net/http" "net/url" "os" "path/filepath" qu "qfw/util" "regexp" util "spiderutil" "strings" "sync" "time" "github.com/donnie4w/go-logger/logger" "github.com/yuin/gopher-lua" ) var SpiderHeart sync.Map = sync.Map{} //爬虫心跳 var Allspiders sync.Map = sync.Map{} //存储正在执行采集列表页任务的爬虫集合 var Allspiders2 sync.Map = sync.Map{} //存储正在执行采集详情页任务的爬虫集合 var LoopListPath sync.Map = sync.Map{} //存储爬虫集合 //var ChanDels = map[int]string{} //var lock sync.Mutex var CC chan *lua.LState var CC2 chan *lua.LState var Chansize int var regcode, _ = regexp.Compile(`="(.*)"`) var InitCount int var InitAllLuaOver = make(chan bool, 1) //所有脚本是否加载完毕 func InitSpider() { scriptMap := getSpiderScriptDB("all") //加载爬虫,初始化模板 scriptMapFile := getSpiderScriptFile(false) for code, v := range scriptMap { LoopListPath.Store(code, v) InitCount++ } for code, v := range scriptMapFile { LoopListPath.Store(code, v) InitCount++ } if util.Config.Working == 0 { NoQueueScript() //高性能模式 } else { if util.Config.Modal == 0 { //原始模式 QueueUpScriptList() } else { //列表页和三级页分开采集 go QueueUpScriptList() //节能模式列表页 go QueueUpScriptDetail() //节能模式三级页 } } } // 高性能模式 func NoQueueScript() { list, _ := MgoS.Find("spider_ldtime", nil, nil, map[string]interface{}{"code": 1, "uplimit": 1, "lowlimit": 1}, false, -1, -1) LoopListPath.Range(func(key, temp interface{}) bool { if info, ok := temp.(map[string]string); ok { code := info["code"] script := info["script"] sp, errstr := CreateSpider(code, script, true, false) if errstr == "" && sp != nil && sp.Code != "nil" { //脚本加载成功 //sp.Index = qu.IntAll(key) //sp2.Index = qu.IntAll(key) Allspiders.Store(sp.Code, sp) for _, tmp := range *list { if qu.ObjToString(tmp["code"]) == sp.Code { sp.UpperLimit = qu.IntAll(tmp["uplimit"]) sp.LowerLimit = qu.IntAll(tmp["lowlimit"]) break } } if !Supplement && util.Config.Modal == 1 && !util.Config.IsHistoryEvent { //列表页、三级页分开采集模式 sp2, _ := CreateSpider(code, script, true, false) sp2.IsMainThread = true //多线程采集时使用 Allspiders2.Store(sp.Code, sp2) } sp.StartJob() //util.TimeSleepFunc(10*time.Millisecond, TimeSleepChan) } else { logger.Info(code, "脚本加载失败,请检查!") nowT := time.Now().Unix() username := "异常" if sp != nil { username = sp.MUserName } MgoS.Update("spider_loadfail", map[string]interface{}{ "code": code, "modifytime": map[string]interface{}{ "$gte": nowT - 12*3600, "$lte": nowT + 12*3600, }, }, map[string]interface{}{ "$set": map[string]interface{}{ "code": code, "type": "初始化", "script": script, "updatetime": nowT, "modifyuser": username, "event": util.Config.Uploadevent, "err": errstr, }, }, true, false) } time.Sleep(100 * time.Millisecond) } return true }) InitAllLuaOver <- true //爬虫初始化完毕 logger.Info("高性能模式:LUA加载完成") numSpider := 0 Allspiders.Range(func(key, value interface{}) bool { numSpider++ return true }) logger.Info("总共加载脚本数:", numSpider) } // 排队模式下载列表页数据 func QueueUpScriptList() { logger.Info("节能模式列表页") CC = make(chan *lua.LState, util.Config.Chansize) for i := 0; i < util.Config.Chansize; i++ { //目前初始化Allspiders,Allspiders2两个爬虫池,线程乘2 CC <- lua.NewState(lua.Options{ RegistrySize: 256 * 20, CallStackSize: 256, IncludeGoStackTrace: false, }) } for { listLen, listNoLen, DelLen := 0, 0, 0 logger.Info(time.Now().Format(qu.Date_Full_Layout), ":下载列表页执行死循环", "初始化脚本数量:", InitCount) LoopListPath.Range(func(key, temp interface{}) bool { if info, ok := temp.(map[string]string); ok { script := info["script"] code := info["code"] //判断上轮code爬虫是否采集完成 old_is_running := false sp_ok := false sp_old := &Spider{} tmp, b := Allspiders.Load(code) if b { if sp_old, sp_ok = tmp.(*Spider); sp_ok { if !sp_old.Stop { //主线任务未完成 old_is_running = true } } } logger.Info("Code:", code, "Is Downloading List:", old_is_running, ",subtask num:", sp_old.ListParallelTaskNum) if !old_is_running { //判断当前爬虫上轮任务是否执行完成 sp, errstr := CreateSpider(code, script, false, false) //logger.Info("初始化脚本是否成功:", sp != nil, e.Value) if errstr == "" && sp != nil && sp.Code != "nil" { //初始化脚本成功 //sp.Index = qu.IntAll(key) sp.ListParallelTaskNum = sp_old.ListParallelTaskNum //继承子任务数量 Allspiders.Store(code, sp) sp.StartJob() } else { nowT := time.Now().Unix() username := "异常" if sp != nil { username = sp.MUserName } MgoS.Update("spider_loadfail", map[string]interface{}{ "code": code, "modifytime": map[string]interface{}{ "$gte": nowT - 12*3600, "$lte": nowT + 12*3600, }, }, map[string]interface{}{ "$set": map[string]interface{}{ "code": code, "type": "初始化", "script": script, "updatetime": nowT, "modifyuser": username, "event": util.Config.Uploadevent, "err": errstr, }, }, true, false) } if sp != nil && sp.IsHistoricalMend { //下载历史的爬虫执行一次后删除 DelLen++ LoopListPath.Delete(key) b = MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false) logger.Debug("Delete History Code:", code, b) } } else if sp_ok && sp_old.ListParallelTaskNum < util.Config.PageTurnInfo.ListParallelTaskLimit { //主任务正在执行,开启子任务 spTmp, errstr := CreateSpider(code, script, true, true) if errstr == "" && spTmp != nil && spTmp.Code != "nil" { //初始化脚本成功 sp_old.ListParallelTaskNum++ logger.Info(code, "子任务开始执行,当前子任务数", sp_old.ListParallelTaskNum) //启动下载 go func(spt, spo *Spider) { defer func() { spt.L.Close() //释放资源 spo.ListParallelTaskNum-- //子任务数减少 }() err := spt.DownListPageItem() //下载列表 if err != nil { logger.Error(spt.Code, err) } }(spTmp, sp_old) } } listLen++ } else { logger.Info("Code:", key, "Is Not Download List") listNoLen++ } time.Sleep(100 * time.Millisecond) return true }) time.Sleep(1 * time.Second) count_ok, count_no := 0, 0 LoopListPath.Range(func(k, v interface{}) bool { if v != nil { count_ok++ } else { count_no++ } return true }) InitCount = count_ok logger.Info(time.Now().Format(qu.Date_Full_Layout), ":下载列表页执行死循环,列表长度,", listLen, listNoLen, "删除数量", DelLen, "执行完毕后数量统计:", count_ok, count_no) } } // 排队模式下载三级页数据 func QueueUpScriptDetail() { logger.Info("节能模式三级页") chanSize := util.Config.DetailChansize CC2 = make(chan *lua.LState, chanSize) for i := 0; i < chanSize; i++ { //目前初始化Allspiders,Allspiders2两个爬虫池,线程乘2 CC2 <- lua.NewState(lua.Options{ RegistrySize: 256 * 20, CallStackSize: 256, IncludeGoStackTrace: false, }) } for { count_ok, count_no := 0, 0 logger.Warn(time.Now().Format(qu.Date_Full_Layout), ":下载三级页执行死循环", "初始化脚本数量:", InitCount) LoopListPath.Range(func(key, temp interface{}) bool { if info, ok := temp.(map[string]string); ok { count_ok++ code := info["code"] old_is_running := false tmp, b := Allspiders2.Load(code) if b { if sp_old, ok := tmp.(*Spider); ok { if !sp_old.Stop { old_is_running = true } } } logger.Info("Code:", code, "Is Downloading Detail:", old_is_running) if !old_is_running { //判断当前爬虫是否正在执行 script := info["script"] sp, errstr := CreateSpider(code, script, true, false) if errstr == "" && sp != nil && sp.Code != "nil" { //初始化脚本成功 //sp.Index = qu.IntAll(key) sp.IsMainThread = true Allspiders2.Store(code, sp) go sp.DownloadListDetail(false) //下载三级页信息 } } } else { logger.Info("Code:", key, "Is Not Download Detail") count_no++ } time.Sleep(100 * time.Millisecond) return true }) InitCount = count_ok time.Sleep(1 * time.Second) logger.Warn(time.Now().Format(qu.Date_Full_Layout), ":下载三级页执行死循环完毕,数量统计:", count_ok, count_no) } } // 获取所有爬虫脚本--数据库 func getSpiderScriptDB(code string) map[string]map[string]string { scriptSpider := map[string]map[string]string{} query := map[string]interface{}{} if Supplement { //数据采集 query = map[string]interface{}{ "state": 5, "platform": "golua平台", "event": map[string]interface{}{ "$ne": 7000, }, "spiderimportant": true, } } else if code == "all" { //初始化所有脚本 query = map[string]interface{}{"state": 5, "event": util.Config.Uploadevent} } else { //消息在线上传 query = map[string]interface{}{"code": code, "event": util.Config.Uploadevent} //query = `{"$or":[{"iupload":1},{"iupload":3}],"event":` + fmt.Sprint(util.Config.Uploadevent) + `,"modifytime":{"$gt":1502937042}}` } listdb, _ := MgoEB.Find("luaconfig", query, map[string]interface{}{"_id": -1}, nil, false, -1, -1) //临时历史附件 //listdb, _ := MgoEB.Find("luaconfig_test", query, map[string]interface{}{"_id": -1}, nil, false, -1, -1) for _, v := range *listdb { if Supplement && strings.Contains(qu.ObjToString(v["code"]), "_bu") { //补采去除含“_bu”后缀的爬虫 continue } old := qu.IntAll(v["old_lua"]) script := "" if old == 1 { script = fmt.Sprint(v["luacontent"]) } else { if v["oldlua"] != nil { if v["luacontent"] != nil { script = v["luacontent"].(string) } } else { script = GetScriptByTmp(v) } } scriptSpider[fmt.Sprint(v["code"])] = map[string]string{ "code": fmt.Sprint(v["code"]), "type": fmt.Sprint(v["state"]), "script": script, "createuser": fmt.Sprint(v["createuser"]), "createuseremail": fmt.Sprint(v["createuseremail"]), "modifyuser": fmt.Sprint(v["modifyuser"]), "modifyemail": fmt.Sprint(v["next"]), } } return scriptSpider } // 获取所有爬虫脚本--文件 func getSpiderScriptFile(newscript bool) map[string]map[string]string { scriptSpider := map[string]map[string]string{} filespider := 0 filepath.Walk("res", func(path string, info os.FileInfo, err error) error { if info.IsDir() { return nil } else if strings.HasPrefix(info.Name(), "spider_") && strings.HasSuffix(info.Name(), ".lua") { //过滤test目录 if strings.Contains(path, "\\test\\") { return nil } loadfile := true if newscript { if time.Now().Unix() < info.ModTime().Add(time.Duration(15)*time.Minute).Unix() { loadfile = true } else { loadfile = false } } if loadfile { f, err := os.Open(path) defer f.Close() if err != nil { logger.Error(err.Error()) } buf := bufio.NewReader(f) script := "" code := "" for { line, err := buf.ReadString('\n') if code == "" && strings.Contains(line, "spiderCode=") { res := regcode.FindAllStringSubmatch(line, -1) if len(res) > 0 { code = res[0][1] //logger.Info("code", code) } else { break } } if scriptSpider[code] == nil { script = script + line + "\n" } else { break } if err != nil { break } } if code != "" && script != "" && scriptSpider[code] == nil { scriptSpider[code] = map[string]string{ "code": code, "type": "5", "script": script, //脚本文件属性值空 "createuser": "", "createuseremail": "", "modifyuser": "", "modifyemail": "", } filespider = filespider + 1 //logger.Info("script", script) } } } return nil }) logger.Info("节点", util.Config.Uploadevent, "脚本文件爬虫数", filespider) return scriptSpider } // 脚本下架、上架、重载 func UpdateSpiderByCodeState(code, state string) (bool, error) { up := false var err error if state != "5" && state != "-1" { //脚本下架 SpiderHeart.Delete(code) //脚本下架,删除脚本对应心跳 logger.Info("下架脚本", code) if util.Config.Working == 1 { //队列模式 for i, as := range []sync.Map{Allspiders, Allspiders2} { if i == 1 && util.Config.Modal == 0 { //队列模式原始模式采集Allspiders2无用(7700下架爬虫) break } tmp, b := as.Load(code) if b { sp, ok := tmp.(*Spider) if ok { sp.Stop = true } as.Delete(code) logger.Info("下架脚本,Allspiders删除") } } } else { //高性能模式 for _, as := range []sync.Map{Allspiders, Allspiders2} { if tmp, ok := as.Load(code); ok { sp, ok := tmp.(*Spider) if ok { sp.Stop = true sp.L.Close() as.Delete(code) } } } } LoopListPath.Delete(code) logger.Info(code, "脚本下架成功") up = true err = nil } else if state == "-1" { //爬虫重采更新线上爬虫 scriptMap := getSpiderScriptDB(code) logger.Info("更新线上脚本,库中是否已存在该脚本:", code, len(scriptMap) > 0, scriptMap[code] != nil) if util.Config.Working == 1 { //排队模式 for _, v := range scriptMap { listsize := 0 listHas := false count_ok, count_no := 0, 0 LoopListPath.Range(func(key, val interface{}) bool { listsize++ if tmp, ok := val.(map[string]string); ok { count_ok++ if tmp["code"] == code && key == code { //队列存在,重载脚本 logger.Info("上架新增脚本,队列中以有该脚本,进行更新") listHas = true LoopListPath.Store(key, v) UpdateHighListDataByCode(code) //爬虫更新上架后,重置数据state=0 logger.Info("队列模式更新列表页信息状态", code) } } else { count_no++ } return true }) logger.Info("上架新增脚本,队列中共有爬虫", listsize, "当前在线数量:", count_ok, "下线数量:", count_no) if !listHas { //队列不存在 logger.Info("重采更新爬虫失败:", code) up = false err = errors.New("爬虫不在线:" + code) } else { up = true err = nil logger.Info("重采更新爬虫成功", code) } } } else { //高性能模式 for k, v := range scriptMap { if spd, ok := Allspiders.Load(k); ok { //对应脚本已存在,更新 sp := spd.(*Spider) sp.ScriptFile = v["script"] sp.MUserName = v["modifyuser"] sp.MUserEmail = v["modifyemail"] Allspiders.Store(k, sp) up = true err = nil logger.Info("重采更新爬虫成功", sp.Code) } else { //不存在 up = false err = errors.New("爬虫不在线:" + code) logger.Info("重采更新爬虫失败:", code) } //Allspiders2 if spd2, ok2 := Allspiders2.Load(k); ok2 { //对应脚本已存在,更新 sp2 := spd2.(*Spider) sp2.ScriptFile = v["script"] sp2.MUserName = v["modifyuser"] sp2.MUserEmail = v["modifyemail"] sp2.LoadScript(&sp2.Name, &sp2.Channel, &sp2.MUserName, k, sp2.ScriptFile, true, false) //更新上架,重载脚本 Allspiders2.Store(k, sp2) // up = true // err = nil logger.Info("Allspiders2重采更新爬虫成功", sp2.Code) } else { //不存在 // up = false // err = errors.New("爬虫不在线:" + code) logger.Info("Allspiders2重采更新爬虫失败:", code) } } } } else { //脚本上架 scriptMap := getSpiderScriptDB(code) logger.Info("上架新增脚本,库中是否已存在该脚本:", code, len(scriptMap) > 0, scriptMap[code] != nil) if util.Config.Modal == 1 && !util.Config.IsHistoryEvent { //分开采集 go UpdateHighListDataByCode(code) } if util.Config.Working == 1 { //排队模式 for _, v := range scriptMap { LoopListPath.Store(code, v) //更新或新增爬虫信息 listsize, count_ok, count_no := 0, 0, 0 isOk := false LoopListPath.Range(func(key, val interface{}) bool { listsize++ if tmp, ok := val.(map[string]string); ok { count_ok++ if tmp["code"] == code && key == code { //队列存在 isOk = true } } else { count_no++ } return true }) logger.Info("上架脚本", isOk, code) logger.Info("上架爬虫后队列中共有爬虫", listsize, "当前在线数量:", count_ok, "下线数量:", count_no) if !isOk { return false, errors.New("use " + code + " failed") } up = true } } else { //高性能模式 for k, v := range scriptMap { LoopListPath.Store(k, v) //1、Allspiders对应7000、7100、7400脚本上架下载数据(列表页爬虫集合) if spd, ok := Allspiders.Load(k); ok { //对应脚本已存在,更新 sp := spd.(*Spider) sp.ScriptFile = v["script"] UpdateSpider(sp, k, v["script"]) //爬虫其他信息更新 //sp.LoadScript(&sp.Name, &sp.Channel, &sp.MUserName, k, sp.ScriptFile, true, false) //更新上架,重载脚本 Allspiders.Store(k, sp) up = true err = nil logger.Info("上架重载脚本", sp.Code) } else { //新增脚本 sp, errstr := CreateSpider(k, v["script"], true, false) if errstr == "" && sp != nil && sp.Code != "nil" { Allspiders.Store(k, sp) sp.StartJob() up = true err = nil logger.Info("上架新增脚本", sp.Code) } else { err = errors.New("新增失败") nowT := time.Now().Unix() MgoS.Update("spider_loadfail", map[string]interface{}{ "code": k, "modifytime": map[string]interface{}{ "$gte": nowT - 12*3600, "$lte": nowT + 12*3600, }, }, map[string]interface{}{ "$set": map[string]interface{}{ "code": k, "type": "新增初始化脚本", "script": v["script"], "updatetime": nowT, "modifyuser": sp.MUserName, "event": util.Config.Uploadevent, "err": errstr, }, }, true, false) } } //2、Allspiders2对应7100、7110、7400上架采集三级页数据(Allspiders2三级页爬虫集合) if util.Config.Modal == 1 && !util.Config.IsHistoryEvent { //Allspiders2 if spd2, ok2 := Allspiders2.Load(k); ok2 { //对应脚本已存在,更新 sp2 := spd2.(*Spider) sp2.ScriptFile = v["script"] UpdateSpider(sp2, k, v["script"]) //爬虫其他信息更新 sp2.LoadScript(&sp2.Name, &sp2.Channel, &sp2.MUserName, k, sp2.ScriptFile, true, false) //更新上架,重载脚本 Allspiders2.Store(k, sp2) //重载后放入集合 // up = true // err = nil logger.Info("Allspiders2上架重载脚本", sp2.Code) } else { //新增脚本 sp2, errstr := CreateSpider(k, v["script"], true, false) if errstr == "" && sp2 != nil && sp2.Code != "nil" { sp2.IsMainThread = true //多线程采集详情页时使用 go sp2.DownloadHighDetail(true) //根据列表页数据下载三级页 Allspiders2.Store(k, sp2) // up = true // err = nil logger.Info("Allspiders2上架新增脚本", sp2.Code) } /*else { err = errors.New("新增失败") mgu.Save("spider_loadfail", "spider", "spider", map[string]interface{}{ "code": k, "type": "新增脚本失败", "script": v["script"], "intime": time.Now().Format(qu.Date_Full_Layout), "event": util.Config.Uploadevent, }) }*/ } } } } } logger.Info("上下架:", up, err) return up, err } // 定时重载脚本文件 func ReloadSpiderFile() { scriptMap := getSpiderScriptFile(true) for k, v := range scriptMap { for i, as := range []sync.Map{Allspiders, Allspiders2} { if i == 1 && util.Config.Modal == 0 { //队列模式原始模式采集Allspiders2无用 continue } if spd, ok := as.Load(k); ok { //对应脚本已存在,更新 sp := spd.(*Spider) logger.Info("定时重载脚本", sp.Code) sp.ScriptFile = v["script"] sp.MUserName = v["modifyuser"] sp.MUserEmail = v["modifyemail"] as.Store(k, sp) } else { //新增脚本 var sp *Spider var errstr string if util.Config.Working == 1 { //排队模式 if i == 0 { //length := 0 //LoopListPath.Range(func(k, v interface{}) bool { // length++ // return true //}) LoopListPath.Store(k, v) //排队模式Allspiders,Allspiders2共用一个LoopListPath,新增一次即可 sp, errstr = CreateSpider(k, v["script"], false, false) } else { sp, errstr = CreateSpider(k, v["script"], true, false) } } else { sp, errstr = CreateSpider(k, v["script"], true, false) } if errstr == "" && sp != nil && sp.Code != "nil" { sp.MUserName = v["modifyuser"] sp.MUserEmail = v["modifyemail"] as.Store(k, sp) if util.Config.Working == 1 { sp.Stop = true // if i == 0 { // length := 0 // LoopListPath.Range(func(k, v interface{}) bool { // length++ // return true // }) // LoopListPath.Store(length, v) // } } else { sp.Stop = false if i == 0 { //高性能模式只有Allspiders启动爬虫,Allspiders2只负责下三级页 sp.StartJob() } } logger.Info("定时重载脚本--新增", sp.Code) } else { if i == 0 { nowT := time.Now().Unix() MgoS.Update("spider_loadfail", map[string]interface{}{ "code": k, "modifytime": map[string]interface{}{ "$gte": nowT - 12*3600, "$lte": nowT + 12*3600, }, }, map[string]interface{}{ "$set": map[string]interface{}{ "code": k, "type": "定时重载--新增失败", "script": v["script"], "updatetime": nowT, "modifyuser": sp.MUserName, "event": util.Config.Uploadevent, "err": errstr, }, }, true, false) } } } } // if spd, ok := Allspiders.Load(k); ok { //对应脚本已存在,更新 // sp := spd.(*Spider) // logger.Info("定时重载脚本", sp.Code) // sp.ScriptFile = v["script"] // if v["createuser"] != "" { // sp.UserName = v["createuser"] // } // if v["createuseremail"] != "" { // sp.UserEmail = v["createuseremail"] // } // sp.MUserName = v["modifyuser"] // sp.MUserEmail = v["modifyemail"] // Allspiders.Store(k, sp) // } else { //新增脚本 // var sp *Spider // if util.Config.Working == 1 { //排队模式 // length := 0 // LoopListPath.Range(func(k, v interface{}) bool { // length++ // return true // }) // LoopListPath.Store(length, v) // sp = CreateSpider(k, v["script"], false,false) // } else { // sp = NewSpider(k, v["script"]) // } // if sp != nil && sp.Code != "nil" { // if v["createuser"] != "" { // sp.UserName = v["createuser"] // } // if v["createuseremail"] != "" { // sp.UserEmail = v["createuseremail"] // } // sp.MUserName = v["modifyuser"] // sp.MUserEmail = v["modifyemail"] // Allspiders.Store(k, sp) // if util.Config.Working == 1 { // sp.Stop = true // length := 0 // LoopListPath.Range(func(k, v interface{}) bool { // length++ // return true // }) // LoopListPath.Store(length, v) // } else { // sp.Stop = false // sp.StartJob() // } // logger.Info("定时重载脚本--新增", sp.Code) // } else { // mgu.Save("spider_loadfail", "spider", "spider", map[string]interface{}{ // "code": k, // "type": "定时重载--新增失败", // "script": v["script"], // "intime": time.Now().Format(qu.Date_Full_Layout), // "event": util.Config.Uploadevent, // }) // } // } } util.TimeAfterFunc(time.Duration(15)*time.Minute, ReloadSpiderFile, TimeChan) } // 生成爬虫 func CreateSpider(code, luafile string, newstate, thread bool) (*Spider, string) { defer qu.Catch() spider := &Spider{} err := spider.LoadScript(&spider.Name, &spider.Channel, &spider.MUserName, code, luafile, newstate, thread) if err != "" { return nil, err } spider.Code = spider.GetVar("spiderCode") spider.SCode = spider.Code spider.Name = spider.GetVar("spiderName") spider.Channel = spider.GetVar("spiderChannel") //spider.LastExecTime = GetLastExectime(spider.Code) spider.DownDetail = spider.GetBoolVar("spiderDownDetailPage") spider.Collection = spider.GetVar("spider2Collection") spider.SpiderRunRate = int64(spider.GetIntVar("spiderRunRate")) //spider.Thread = int64(spider.GetIntVar("spiderThread")) spider.StoreToMsgEvent = spider.GetIntVar("spiderStoreToMsgEvent") spider.StoreMode = spider.GetIntVar("spiderStoreMode") spider.CoverAttr = spider.GetVar("spiderCoverAttr") spiderSleepBase := spider.GetIntVar("spiderSleepBase") if spiderSleepBase == -1 { spider.SleepBase = 1000 } else { spider.SleepBase = spiderSleepBase } spiderSleepRand := spider.GetIntVar("spiderSleepRand") if spiderSleepRand == -1 { spider.SleepRand = 1000 } else { spider.SleepRand = spiderSleepRand } spiderTimeout := spider.GetIntVar("spiderTimeout") if spiderTimeout == -1 { spider.Timeout = 60 } else { spider.Timeout = int64(spiderTimeout) } spider.TargetChannelUrl = spider.GetVar("spiderTargetChannelUrl") //spider.UserName = spider.GetVar("spiderUserName") //spider.UserEmail = spider.GetVar("spiderUserEmail") //spider.UploadTime = spider.GetVar("spiderUploadTime") spider.MUserName = spider.GetVar("spiderUserName") spider.MUserEmail = spider.GetVar("spiderUserEmail") //新增历史补漏 //qu.Debug("-------", spider.GetBoolVar("spiderIsHistoricalMend"), spider.GetBoolVar("spiderIsMustDownload")) spider.IsHistoricalMend = spider.GetBoolVar("spiderIsHistoricalMend") spider.IsMustDownload = spider.GetBoolVar("spiderIsMustDownload") //新老爬虫 spider.IsCompete = spider.GetBoolVar("spiderIsCompete") //爬虫类型 spider.Infoformat = spider.GetIntVar("spiderInfoformat") return spider, "" } // 更新爬虫 func UpdateSpider(spider *Spider, code, script string) { ts := &Spider{} ts.Script.L = lua.NewState(lua.Options{ RegistrySize: 256 * 20, CallStackSize: 256, IncludeGoStackTrace: false, }) defer ts.L.Close() ts.Script.L.PreloadModule("http", gluahttp.NewHttpModule(&http.Client{}).Loader) ts.Script.L.PreloadModule("json", lujson.Loader) if err := ts.Script.L.DoString(script); err != nil { logger.Debug(code + ",加载lua脚本错误:" + err.Error()) return } spider.Channel = ts.GetVar("spiderChannel") //栏目名称 spider.DownDetail = ts.GetBoolVar("spiderDownDetailPage") //是否下三级页 spider.Collection = ts.GetVar("spider2Collection") //存储表 spider.SpiderRunRate = int64(ts.GetIntVar("spiderRunRate")) //间隔时间 spider.StoreToMsgEvent = ts.GetIntVar("spiderStoreToMsgEvent") //4002 spider.StoreMode = ts.GetIntVar("spiderStoreMode") //2 spider.CoverAttr = ts.GetVar("spiderCoverAttr") //title //下载三级页(DownloadDetailPage)随机延迟 spiderSleepBase := ts.GetIntVar("spiderSleepBase") if spiderSleepBase == -1 { spider.SleepBase = 1000 } else { spider.SleepBase = spiderSleepBase } spiderSleepRand := ts.GetIntVar("spiderSleepRand") if spiderSleepRand == -1 { spider.SleepRand = 1000 } else { spider.SleepRand = spiderSleepRand } spiderTimeout := ts.GetIntVar("spiderTimeout") if spiderTimeout == -1 { spider.Timeout = 60 } else { spider.Timeout = int64(spiderTimeout) } spider.MUserName = spider.GetVar("spiderUserName") spider.MUserEmail = spider.GetVar("spiderUserEmail") spider.TargetChannelUrl = ts.GetVar("spiderTargetChannelUrl") //栏目地址 //新增历史补漏 spider.IsHistoricalMend = ts.GetBoolVar("spiderIsHistoricalMend") spider.IsMustDownload = ts.GetBoolVar("spiderIsMustDownload") //新老爬虫 spider.IsCompete = ts.GetBoolVar("spiderIsCompete") //爬虫类型 spider.Infoformat = spider.GetIntVar("spiderInfoformat") } // 多线程生成爬虫 func NewSpiderForThread(code, luafile string) (*Spider, string) { defer qu.Catch() spider := &Spider{} err := spider.LoadScript(&spider.Name, &spider.Channel, &spider.MUserName, code, luafile, true, true) if err != "" { return nil, err } spider.Code = spider.GetVar("spiderCode") spider.SCode = spider.Code spider.Script.SCode = spider.Code spider.Name = spider.GetVar("spiderName") spider.Channel = spider.GetVar("spiderChannel") //spider.LastExecTime = GetLastExectime(spider.Code) spider.DownDetail = spider.GetBoolVar("spiderDownDetailPage") spider.Collection = spider.GetVar("spider2Collection") spider.SpiderRunRate = int64(spider.GetIntVar("spiderRunRate")) //spider.Thread = int64(spider.GetIntVar("spiderThread")) spider.StoreToMsgEvent = spider.GetIntVar("spiderStoreToMsgEvent") spider.StoreMode = spider.GetIntVar("spiderStoreMode") spider.CoverAttr = spider.GetVar("spiderCoverAttr") spiderSleepBase := spider.GetIntVar("spiderSleepBase") if spiderSleepBase == -1 { spider.SleepBase = 1000 } else { spider.SleepBase = spiderSleepBase } spiderSleepRand := spider.GetIntVar("spiderSleepRand") if spiderSleepRand == -1 { spider.SleepRand = 1000 } else { spider.SleepRand = spiderSleepRand } spiderTimeout := spider.GetIntVar("spiderTimeout") if spiderTimeout == -1 { spider.Timeout = 60 } else { spider.Timeout = int64(spiderTimeout) } spider.TargetChannelUrl = spider.GetVar("spiderTargetChannelUrl") //spider.UserName = spider.GetVar("spiderUserName") //spider.UserEmail = spider.GetVar("spiderUserEmail") //spider.UploadTime = spider.GetVar("spiderUploadTime") //新增历史补漏 //qu.Debug("-------", spider.GetBoolVar("spiderIsHistoricalMend"), spider.GetBoolVar("spiderIsMustDownload")) spider.IsHistoricalMend = spider.GetBoolVar("spiderIsHistoricalMend") spider.IsMustDownload = spider.GetBoolVar("spiderIsMustDownload") //新老爬虫 spider.IsCompete = spider.GetBoolVar("spiderIsCompete") //爬虫类型 spider.Infoformat = spider.GetIntVar("spiderInfoformat") return spider, "" } // 下载量入库 func SaveDownCount(code string, addtotal bool, todayDowncount, todayRequestNum, yesterdayDowncount, yestoDayRequestNum int32) { date := time.Unix(time.Now().Unix(), 0).Format(qu.Date_Short_Layout) updata := map[string]interface{}{} if addtotal { updata = map[string]interface{}{ "$inc": map[string]interface{}{"totaldown": todayDowncount, "totalreq": todayRequestNum}, "$set": map[string]interface{}{ "yesdowncount": yesterdayDowncount, "yesdownreq": yestoDayRequestNum, "todaydowncount": todayDowncount, "todaydownreq": todayRequestNum, "date": date, "year": time.Now().Year(), "month": time.Now().Month(), "day": time.Now().Day(), }, } } else { updata = map[string]interface{}{ "$set": map[string]interface{}{ "yesdowncount": yesterdayDowncount, "yesdownreq": yestoDayRequestNum, "todaydowncount": todayDowncount, "todaydownreq": todayRequestNum, "date": date, "year": time.Now().Year(), "month": time.Now().Month(), "day": time.Now().Day(), }, } } MgoS.Update("spider_downlog", map[string]interface{}{"code": code, "date": date}, updata, true, false) } // 获取下载的上下限(没用) func GetLimitDownload(code string) (uplimit, lowlimit int) { defer qu.Catch() ret, _ := MgoS.FindOne("spider_ldtime", map[string]interface{}{"code": code}) if ret != nil && len(*ret) > 0 { uplimit = qu.IntAll((*ret)["uplimit"]) lowlimit = qu.IntAll((*ret)["lowlimit"]) return uplimit, lowlimit } else { return 100, 0 } } // 拼装脚本 func GetScriptByTmp(luaconfig map[string]interface{}) string { defer qu.Catch() script := "" if luaconfig["listcheck"] == nil { luaconfig["listcheck"] = "" } if luaconfig["contentcheck"] == nil { luaconfig["contentcheck"] = "" } modifyUser := qu.ObjToString(luaconfig["modifyuser"]) modifyUserEmail := qu.ObjToString(luaconfig["createuseremail"]) if luaconfig != nil && len(luaconfig) > 0 { common := luaconfig["param_common"].([]interface{}) //新增spiderIsHistoricalMend spiderIsMustDownload if len(common) == 15 { common = append(common, modifyUser, modifyUserEmail, "") } else { common = append(common, false, false, modifyUser, modifyUserEmail, "") } for k, v := range common { if k == 4 || k == 5 || k == 6 || k == 9 || k == 10 { common[k] = qu.IntAll(v) } } script, _ = GetTmpModel(map[string][]interface{}{"common": common}) //发布时间 script_time := "" if qu.IntAll(luaconfig["type_time"]) == 0 { //向导模式 time := luaconfig["param_time"].([]interface{}) script_time, _ = GetTmpModel(map[string][]interface{}{ "time": time, }) } else { //专家模式 script_time = luaconfig["str_time"].(string) } //列表页 script_list := "" if qu.IntAll(luaconfig["type_list"]) == 0 { //向导模式 list := luaconfig["param_list"].([]interface{}) addrs := strings.Split(list[1].(string), "\n") if len(addrs) > 0 { for k, v := range addrs { addrs[k] = "'" + v + "'" } list[1] = strings.Join(addrs, ",") } else { list[1] = "" } script_list, _ = GetTmpModel(map[string][]interface{}{ "list": list, "listcheck": []interface{}{luaconfig["listcheck"]}, }) } else { //专家模式 script_list = luaconfig["str_list"].(string) } //三级页 script_content := "" if qu.IntAll(luaconfig["type_content"]) == 0 { //向导模式 content := luaconfig["param_content"].([]interface{}) script_content, _ = GetTmpModel(map[string][]interface{}{ "content": content, "contentcheck": []interface{}{luaconfig["contentcheck"]}, }) } else { //专家模式 script_content = luaconfig["str_content"].(string) } script += fmt.Sprintf(util.Tmp_Other, luaconfig["spidertype"], luaconfig["spiderhistorymaxpage"], luaconfig["spidermovevent"], luaconfig["spidercompete"], luaconfig["infoformat"]) script += ` ` + script_time + ` ` + script_list + ` ` + script_content script = ReplaceModel(script, common, luaconfig["model"].(map[string]interface{})) } return script } // 生成爬虫脚本 func GetTmpModel(param map[string][]interface{}) (script string, err interface{}) { qu.Try(func() { //param_common拼接 if param != nil && param["common"] != nil { if len(param["common"]) < 12 { err = "公共参数配置不全" } else { script = fmt.Sprintf(util.Tmp_common, param["common"]...) } } //发布时间拼接 if param != nil && param["time"] != nil { if len(param["time"]) < 3 { err = "方法:time-参数配置不全" } else { script += fmt.Sprintf(util.Tmp_pubtime, param["time"]...) } } //列表页拼接 if param != nil && param["list"] != nil { if len(param["list"]) < 7 { err = "方法:list-参数配置不全" } else { list := []interface{}{param["listcheck"][0]} list = append(list, param["list"]...) script += fmt.Sprintf(util.Tmp_pagelist, list...) script = strings.Replace(script, "#pageno#", `"..tostring(pageno).."`, -1) } } //详情页拼接 if param != nil && param["content"] != nil { if len(param["content"]) < 2 { err = "方法:content-参数配置不全" } else { content := []interface{}{param["contentcheck"][0]} content = append(content, param["content"]...) script += fmt.Sprintf(util.Tmp_content, content...) } } }, func(e interface{}) { err = e }) return script, err } // 补充模型 func ReplaceModel(script string, comm []interface{}, model map[string]interface{}) string { defer qu.Catch() //补充通用信息 commstr := `item["spidercode"]="` + comm[0].(string) + `";` commstr += `item["site"]="` + comm[1].(string) + `";` commstr += `item["channel"]="` + comm[2].(string) + `";` script = strings.Replace(script, "--Common--", commstr, -1) //补充模型信息 modelstr := "" for k, v := range model { modelstr += `item["` + k + `"]="` + v.(string) + `";` } script = strings.Replace(script, "--Model--", modelstr, -1) return script } // 爬虫信息提交编辑器(心跳) func SpiderInfoSend() { time.Sleep(15 * time.Second) list := []interface{}{} Allspiders.Range(func(key, value interface{}) bool { v := value.(*Spider) info := map[string]interface{}{} info["code"] = v.Code info["todayDowncount"] = v.TodayDowncount info["toDayRequestNum"] = v.ToDayRequestNum info["yesterdayDowncount"] = v.YesterdayDowncount info["yestoDayRequestNum"] = v.YestoDayRequestNum info["totalDowncount"] = v.TotalDowncount info["totalRequestNum"] = v.TotalRequestNum info["errorNum"] = v.ErrorNum info["roundCount"] = v.RoundCount info["runRate"] = v.SpiderRunRate info["lastHeartbeat"] = v.LastHeartbeat info["lastDowncount"] = v.LastDowncount info["lstate"] = v.L.Status(v.L) list = append(list, info) return true }) bs, _ := json.Marshal(list) value := url.Values{ "data": []string{util.Se.EncodeString(string(bs))}, "type": []string{"info"}, } _, err := http.PostForm(util.Config.Editoraddr, value) if err != nil { logger.Error("send to editor: ", err.Error()) } util.TimeAfterFunc(5*time.Minute, SpiderInfoSend, TimeChan) } // 保存心跳信息 func SaveHeartInfo() { time.Sleep(20 * time.Minute) num := 0 SpiderHeart.Range(func(key, value interface{}) bool { code := key.(string) sp, spiderOk := LoopListPath.Load(code) if spiderOk && sp != nil { heart, heartOk := value.(*Heart) if heartOk { num++ update := []map[string]interface{}{} update = append(update, map[string]interface{}{"code": code}) update = append(update, map[string]interface{}{"$set": map[string]interface{}{ "site": heart.Site, "channel": heart.Channel, "firstpage": heart.FirstPageHeart, "list": heart.ListHeart, "findlist": heart.FindListHeart, "detail": heart.DetailHeart, "detailexecute": heart.DetailExecuteHeart, "modifyuser": heart.ModifyUser, "event": util.Config.Uploadevent, "updatetime": time.Now().Unix(), "del": false, }}) UpdataHeartCache <- update } } else { SpiderHeart.Delete(key) } return true }) logger.Info("更新心跳个数:", num) time.AfterFunc(1*time.Second, SaveHeartInfo) } // 保存7000节点爬虫转增量节点日志 func SpiderCodeSendToEditor(code string) { defer qu.Catch() MgoEB.Save("luamovelog", map[string]interface{}{ "code": code, "comeintime": time.Now().Unix(), "ok": false, }) //ok := false //for i := 1; i <= 3; i++ { // logger.Info("Code:", code, " times:", i, " Send Move Event") // list := []interface{}{} // list = append(list, code) // bs, _ := json.Marshal(list) // value := url.Values{ // "data": []string{util.Se.EncodeString(string(bs))}, // "type": []string{"code"}, // } // res, err := http.PostForm(util.Config.Editoraddr, value) // if err != nil { // logger.Error("Send To Editor For Move Event Failed,Code:", code) // } else { // if res != nil { // res.Body.Close() // } // ok = true // break // } //} //logger.Info("Code:", code, " Send Move Event:", ok) //MgoEB.Save("luamovelog", map[string]interface{}{ // "code": code, // "comeintime": time.Now().Unix(), // "type": "sendfail", // "ok": ok, //}) }