123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251 |
- package spider
- import (
- "bufio"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/cjoudrey/gluahttp"
- lujson "github.com/yuin/gopher-json"
- "net/http"
- "net/url"
- "os"
- "path/filepath"
- qu "qfw/util"
- "regexp"
- util "spiderutil"
- "strings"
- "sync"
- "time"
- "github.com/donnie4w/go-logger/logger"
- "github.com/yuin/gopher-lua"
- )
- var SpiderHeart sync.Map = sync.Map{} //爬虫心跳
- var Allspiders sync.Map = sync.Map{} //存储正在执行采集列表页任务的爬虫集合
- var Allspiders2 sync.Map = sync.Map{} //存储正在执行采集详情页任务的爬虫集合
- var LoopListPath sync.Map = sync.Map{} //存储爬虫集合
- //var ChanDels = map[int]string{}
- //var lock sync.Mutex
- var CC chan *lua.LState
- var CC2 chan *lua.LState
- var Chansize int
- var regcode, _ = regexp.Compile(`="(.*)"`)
- var InitCount int
- var InitAllLuaOver = make(chan bool, 1) //所有脚本是否加载完毕
- func InitSpider() {
- scriptMap := getSpiderScriptDB("all") //加载爬虫,初始化模板
- scriptMapFile := getSpiderScriptFile(false)
- for code, v := range scriptMap {
- LoopListPath.Store(code, v)
- InitCount++
- }
- for code, v := range scriptMapFile {
- LoopListPath.Store(code, v)
- InitCount++
- }
- if util.Config.Working == 0 {
- NoQueueScript() //高性能模式
- } else {
- if util.Config.Modal == 0 { //原始模式
- QueueUpScriptList()
- } else { //列表页和三级页分开采集
- go QueueUpScriptList() //节能模式列表页
- go QueueUpScriptDetail() //节能模式三级页
- }
- }
- }
- // 高性能模式
- func NoQueueScript() {
- list, _ := MgoS.Find("spider_ldtime", nil, nil, map[string]interface{}{"code": 1, "uplimit": 1, "lowlimit": 1}, false, -1, -1)
- LoopListPath.Range(func(key, temp interface{}) bool {
- if info, ok := temp.(map[string]string); ok {
- code := info["code"]
- script := info["script"]
- sp, errstr := CreateSpider(code, script, true, false)
- if errstr == "" && sp != nil && sp.Code != "nil" { //脚本加载成功
- //sp.Index = qu.IntAll(key)
- //sp2.Index = qu.IntAll(key)
- Allspiders.Store(sp.Code, sp)
- for _, tmp := range *list {
- if qu.ObjToString(tmp["code"]) == sp.Code {
- sp.UpperLimit = qu.IntAll(tmp["uplimit"])
- sp.LowerLimit = qu.IntAll(tmp["lowlimit"])
- break
- }
- }
- if !Supplement && util.Config.Modal == 1 && !util.Config.IsHistoryEvent { //列表页、三级页分开采集模式
- sp2, _ := CreateSpider(code, script, true, false)
- sp2.IsMainThread = true //多线程采集时使用
- Allspiders2.Store(sp.Code, sp2)
- }
- sp.StartJob()
- //util.TimeSleepFunc(10*time.Millisecond, TimeSleepChan)
- } else {
- logger.Info(code, "脚本加载失败,请检查!")
- nowT := time.Now().Unix()
- username := "异常"
- if sp != nil {
- username = sp.MUserName
- }
- MgoS.Update("spider_loadfail",
- map[string]interface{}{
- "code": code,
- "modifytime": map[string]interface{}{
- "$gte": nowT - 12*3600,
- "$lte": nowT + 12*3600,
- },
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "code": code,
- "type": "初始化",
- "script": script,
- "updatetime": nowT,
- "modifyuser": username,
- "event": util.Config.Uploadevent,
- "err": errstr,
- },
- }, true, false)
- }
- time.Sleep(100 * time.Millisecond)
- }
- return true
- })
- InitAllLuaOver <- true //爬虫初始化完毕
- logger.Info("高性能模式:LUA加载完成")
- numSpider := 0
- Allspiders.Range(func(key, value interface{}) bool {
- numSpider++
- return true
- })
- logger.Info("总共加载脚本数:", numSpider)
- }
- // 排队模式下载列表页数据
- func QueueUpScriptList() {
- logger.Info("节能模式列表页")
- CC = make(chan *lua.LState, util.Config.Chansize)
- for i := 0; i < util.Config.Chansize; i++ { //目前初始化Allspiders,Allspiders2两个爬虫池,线程乘2
- CC <- lua.NewState(lua.Options{
- RegistrySize: 256 * 20,
- CallStackSize: 256,
- IncludeGoStackTrace: false,
- })
- }
- for {
- listLen, listNoLen, DelLen := 0, 0, 0
- logger.Info(time.Now().Format(qu.Date_Full_Layout), ":下载列表页执行死循环", "初始化脚本数量:", InitCount)
- LoopListPath.Range(func(key, temp interface{}) bool {
- if info, ok := temp.(map[string]string); ok {
- script := info["script"]
- code := info["code"]
- //判断上轮code爬虫是否采集完成
- old_is_running := false
- sp_ok := false
- sp_old := &Spider{}
- tmp, b := Allspiders.Load(code)
- if b {
- if sp_old, sp_ok = tmp.(*Spider); sp_ok {
- if !sp_old.Stop { //主线任务未完成
- old_is_running = true
- }
- }
- }
- logger.Info("Code:", code, "Is Downloading List:", old_is_running, ",subtask num:", sp_old.ListParallelTaskNum)
- if !old_is_running { //判断当前爬虫上轮任务是否执行完成
- sp, errstr := CreateSpider(code, script, false, false)
- //logger.Info("初始化脚本是否成功:", sp != nil, e.Value)
- if errstr == "" && sp != nil && sp.Code != "nil" { //初始化脚本成功
- //sp.Index = qu.IntAll(key)
- sp.ListParallelTaskNum = sp_old.ListParallelTaskNum //继承子任务数量
- Allspiders.Store(code, sp)
- sp.StartJob()
- } else {
- nowT := time.Now().Unix()
- username := "异常"
- if sp != nil {
- username = sp.MUserName
- }
- MgoS.Update("spider_loadfail",
- map[string]interface{}{
- "code": code,
- "modifytime": map[string]interface{}{
- "$gte": nowT - 12*3600,
- "$lte": nowT + 12*3600,
- },
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "code": code,
- "type": "初始化",
- "script": script,
- "updatetime": nowT,
- "modifyuser": username,
- "event": util.Config.Uploadevent,
- "err": errstr,
- },
- }, true, false)
- }
- if sp != nil && sp.IsHistoricalMend { //下载历史的爬虫执行一次后删除
- DelLen++
- LoopListPath.Delete(key)
- b = MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false)
- logger.Debug("Delete History Code:", code, b)
- }
- } else if sp_ok && sp_old.ListParallelTaskNum < util.Config.PageTurnInfo.ListParallelTaskLimit { //主任务正在执行,开启子任务
- spTmp, errstr := CreateSpider(code, script, true, true)
- if errstr == "" && spTmp != nil && spTmp.Code != "nil" { //初始化脚本成功
- sp_old.ListParallelTaskNum++
- logger.Info(code, "子任务开始执行,当前子任务数", sp_old.ListParallelTaskNum)
- //启动下载
- go func(spt, spo *Spider) {
- defer func() {
- spt.L.Close() //释放资源
- spo.ListParallelTaskNum-- //子任务数减少
- }()
- err := spt.DownListPageItem() //下载列表
- if err != nil {
- logger.Error(spt.Code, err)
- }
- }(spTmp, sp_old)
- }
- }
- listLen++
- } else {
- logger.Info("Code:", key, "Is Not Download List")
- listNoLen++
- }
- time.Sleep(100 * time.Millisecond)
- return true
- })
- time.Sleep(1 * time.Second)
- count_ok, count_no := 0, 0
- LoopListPath.Range(func(k, v interface{}) bool {
- if v != nil {
- count_ok++
- } else {
- count_no++
- }
- return true
- })
- InitCount = count_ok
- logger.Info(time.Now().Format(qu.Date_Full_Layout), ":下载列表页执行死循环,列表长度,", listLen, listNoLen, "删除数量", DelLen, "执行完毕后数量统计:", count_ok, count_no)
- }
- }
- // 排队模式下载三级页数据
- func QueueUpScriptDetail() {
- logger.Info("节能模式三级页")
- chanSize := util.Config.DetailChansize
- CC2 = make(chan *lua.LState, chanSize)
- for i := 0; i < chanSize; i++ { //目前初始化Allspiders,Allspiders2两个爬虫池,线程乘2
- CC2 <- lua.NewState(lua.Options{
- RegistrySize: 256 * 20,
- CallStackSize: 256,
- IncludeGoStackTrace: false,
- })
- }
- for {
- count_ok, count_no := 0, 0
- logger.Warn(time.Now().Format(qu.Date_Full_Layout), ":下载三级页执行死循环", "初始化脚本数量:", InitCount)
- LoopListPath.Range(func(key, temp interface{}) bool {
- if info, ok := temp.(map[string]string); ok {
- count_ok++
- code := info["code"]
- old_is_running := false
- tmp, b := Allspiders2.Load(code)
- if b {
- if sp_old, ok := tmp.(*Spider); ok {
- if !sp_old.Stop {
- old_is_running = true
- }
- }
- }
- logger.Info("Code:", code, "Is Downloading Detail:", old_is_running)
- if !old_is_running { //判断当前爬虫是否正在执行
- script := info["script"]
- sp, errstr := CreateSpider(code, script, true, false)
- if errstr == "" && sp != nil && sp.Code != "nil" { //初始化脚本成功
- //sp.Index = qu.IntAll(key)
- sp.IsMainThread = true
- Allspiders2.Store(code, sp)
- go sp.DownloadListDetail(false) //下载三级页信息
- }
- }
- } else {
- logger.Info("Code:", key, "Is Not Download Detail")
- count_no++
- }
- time.Sleep(100 * time.Millisecond)
- return true
- })
- InitCount = count_ok
- time.Sleep(1 * time.Second)
- logger.Warn(time.Now().Format(qu.Date_Full_Layout), ":下载三级页执行死循环完毕,数量统计:", count_ok, count_no)
- }
- }
- // 获取所有爬虫脚本--数据库
- func getSpiderScriptDB(code string) map[string]map[string]string {
- scriptSpider := map[string]map[string]string{}
- query := map[string]interface{}{}
- if Supplement { //数据采集
- query = map[string]interface{}{
- "state": 5,
- "platform": "golua平台",
- "event": map[string]interface{}{
- "$ne": 7000,
- },
- "spiderimportant": true,
- }
- } else if code == "all" { //初始化所有脚本
- query = map[string]interface{}{"state": 5, "event": util.Config.Uploadevent}
- } else { //消息在线上传
- query = map[string]interface{}{"code": code, "event": util.Config.Uploadevent}
- //query = `{"$or":[{"iupload":1},{"iupload":3}],"event":` + fmt.Sprint(util.Config.Uploadevent) + `,"modifytime":{"$gt":1502937042}}`
- }
- listdb, _ := MgoEB.Find("luaconfig", query, map[string]interface{}{"_id": -1}, nil, false, -1, -1)
- //临时历史附件
- //listdb, _ := MgoEB.Find("luaconfig_test", query, map[string]interface{}{"_id": -1}, nil, false, -1, -1)
- for _, v := range *listdb {
- if Supplement && strings.Contains(qu.ObjToString(v["code"]), "_bu") { //补采去除含“_bu”后缀的爬虫
- continue
- }
- old := qu.IntAll(v["old_lua"])
- script := ""
- if old == 1 {
- script = fmt.Sprint(v["luacontent"])
- } else {
- if v["oldlua"] != nil {
- if v["luacontent"] != nil {
- script = v["luacontent"].(string)
- }
- } else {
- script = GetScriptByTmp(v)
- }
- }
- scriptSpider[fmt.Sprint(v["code"])] = map[string]string{
- "code": fmt.Sprint(v["code"]),
- "type": fmt.Sprint(v["state"]),
- "script": script,
- "createuser": fmt.Sprint(v["createuser"]),
- "createuseremail": fmt.Sprint(v["createuseremail"]),
- "modifyuser": fmt.Sprint(v["modifyuser"]),
- "modifyemail": fmt.Sprint(v["next"]),
- }
- }
- return scriptSpider
- }
- // 获取所有爬虫脚本--文件
- func getSpiderScriptFile(newscript bool) map[string]map[string]string {
- scriptSpider := map[string]map[string]string{}
- filespider := 0
- filepath.Walk("res", func(path string, info os.FileInfo, err error) error {
- if info.IsDir() {
- return nil
- } else if strings.HasPrefix(info.Name(), "spider_") &&
- strings.HasSuffix(info.Name(), ".lua") {
- //过滤test目录
- if strings.Contains(path, "\\test\\") {
- return nil
- }
- loadfile := true
- if newscript {
- if time.Now().Unix() < info.ModTime().Add(time.Duration(15)*time.Minute).Unix() {
- loadfile = true
- } else {
- loadfile = false
- }
- }
- if loadfile {
- f, err := os.Open(path)
- defer f.Close()
- if err != nil {
- logger.Error(err.Error())
- }
- buf := bufio.NewReader(f)
- script := ""
- code := ""
- for {
- line, err := buf.ReadString('\n')
- if code == "" && strings.Contains(line, "spiderCode=") {
- res := regcode.FindAllStringSubmatch(line, -1)
- if len(res) > 0 {
- code = res[0][1]
- //logger.Info("code", code)
- } else {
- break
- }
- }
- if scriptSpider[code] == nil {
- script = script + line + "\n"
- } else {
- break
- }
- if err != nil {
- break
- }
- }
- if code != "" && script != "" && scriptSpider[code] == nil {
- scriptSpider[code] = map[string]string{
- "code": code,
- "type": "5",
- "script": script,
- //脚本文件属性值空
- "createuser": "",
- "createuseremail": "",
- "modifyuser": "",
- "modifyemail": "",
- }
- filespider = filespider + 1
- //logger.Info("script", script)
- }
- }
- }
- return nil
- })
- logger.Info("节点", util.Config.Uploadevent, "脚本文件爬虫数", filespider)
- return scriptSpider
- }
- // 脚本下架、上架、重载
- func UpdateSpiderByCodeState(code, state string) (bool, error) {
- up := false
- var err error
- if state != "5" && state != "-1" { //脚本下架
- SpiderHeart.Delete(code) //脚本下架,删除脚本对应心跳
- logger.Info("下架脚本", code)
- if util.Config.Working == 1 { //队列模式
- for i, as := range []sync.Map{Allspiders, Allspiders2} {
- if i == 1 && util.Config.Modal == 0 { //队列模式原始模式采集Allspiders2无用(7700下架爬虫)
- break
- }
- tmp, b := as.Load(code)
- if b {
- sp, ok := tmp.(*Spider)
- if ok {
- sp.Stop = true
- }
- as.Delete(code)
- logger.Info("下架脚本,Allspiders删除")
- }
- }
- } else { //高性能模式
- for _, as := range []sync.Map{Allspiders, Allspiders2} {
- if tmp, ok := as.Load(code); ok {
- sp, ok := tmp.(*Spider)
- if ok {
- sp.Stop = true
- sp.L.Close()
- as.Delete(code)
- }
- }
- }
- }
- LoopListPath.Delete(code)
- logger.Info(code, "脚本下架成功")
- up = true
- err = nil
- } else if state == "-1" { //爬虫重采更新线上爬虫
- scriptMap := getSpiderScriptDB(code)
- logger.Info("更新线上脚本,库中是否已存在该脚本:", code, len(scriptMap) > 0, scriptMap[code] != nil)
- if util.Config.Working == 1 { //排队模式
- for _, v := range scriptMap {
- listsize := 0
- listHas := false
- count_ok, count_no := 0, 0
- LoopListPath.Range(func(key, val interface{}) bool {
- listsize++
- if tmp, ok := val.(map[string]string); ok {
- count_ok++
- if tmp["code"] == code && key == code { //队列存在,重载脚本
- logger.Info("上架新增脚本,队列中以有该脚本,进行更新")
- listHas = true
- LoopListPath.Store(key, v)
- UpdateHighListDataByCode(code) //爬虫更新上架后,重置数据state=0
- logger.Info("队列模式更新列表页信息状态", code)
- }
- } else {
- count_no++
- }
- return true
- })
- logger.Info("上架新增脚本,队列中共有爬虫", listsize, "当前在线数量:", count_ok, "下线数量:", count_no)
- if !listHas { //队列不存在
- logger.Info("重采更新爬虫失败:", code)
- up = false
- err = errors.New("爬虫不在线:" + code)
- } else {
- up = true
- err = nil
- logger.Info("重采更新爬虫成功", code)
- }
- }
- } else { //高性能模式
- for k, v := range scriptMap {
- if spd, ok := Allspiders.Load(k); ok { //对应脚本已存在,更新
- sp := spd.(*Spider)
- sp.ScriptFile = v["script"]
- sp.MUserName = v["modifyuser"]
- sp.MUserEmail = v["modifyemail"]
- Allspiders.Store(k, sp)
- up = true
- err = nil
- logger.Info("重采更新爬虫成功", sp.Code)
- } else { //不存在
- up = false
- err = errors.New("爬虫不在线:" + code)
- logger.Info("重采更新爬虫失败:", code)
- }
- //Allspiders2
- if spd2, ok2 := Allspiders2.Load(k); ok2 { //对应脚本已存在,更新
- sp2 := spd2.(*Spider)
- sp2.ScriptFile = v["script"]
- sp2.MUserName = v["modifyuser"]
- sp2.MUserEmail = v["modifyemail"]
- sp2.LoadScript(&sp2.Name, &sp2.Channel, &sp2.MUserName, k, sp2.ScriptFile, true, false) //更新上架,重载脚本
- Allspiders2.Store(k, sp2)
- // up = true
- // err = nil
- logger.Info("Allspiders2重采更新爬虫成功", sp2.Code)
- } else { //不存在
- // up = false
- // err = errors.New("爬虫不在线:" + code)
- logger.Info("Allspiders2重采更新爬虫失败:", code)
- }
- }
- }
- } else { //脚本上架
- scriptMap := getSpiderScriptDB(code)
- logger.Info("上架新增脚本,库中是否已存在该脚本:", code, len(scriptMap) > 0, scriptMap[code] != nil)
- if util.Config.Modal == 1 && !util.Config.IsHistoryEvent { //分开采集
- go UpdateHighListDataByCode(code)
- }
- if util.Config.Working == 1 { //排队模式
- for _, v := range scriptMap {
- LoopListPath.Store(code, v) //更新或新增爬虫信息
- listsize, count_ok, count_no := 0, 0, 0
- isOk := false
- LoopListPath.Range(func(key, val interface{}) bool {
- listsize++
- if tmp, ok := val.(map[string]string); ok {
- count_ok++
- if tmp["code"] == code && key == code { //队列存在
- isOk = true
- }
- } else {
- count_no++
- }
- return true
- })
- logger.Info("上架脚本", isOk, code)
- logger.Info("上架爬虫后队列中共有爬虫", listsize, "当前在线数量:", count_ok, "下线数量:", count_no)
- if !isOk {
- return false, errors.New("use " + code + " failed")
- }
- up = true
- }
- } else { //高性能模式
- for k, v := range scriptMap {
- LoopListPath.Store(k, v)
- //1、Allspiders对应7000、7100、7400脚本上架下载数据(列表页爬虫集合)
- if spd, ok := Allspiders.Load(k); ok { //对应脚本已存在,更新
- sp := spd.(*Spider)
- sp.ScriptFile = v["script"]
- UpdateSpider(sp, k, v["script"]) //爬虫其他信息更新
- //sp.LoadScript(&sp.Name, &sp.Channel, &sp.MUserName, k, sp.ScriptFile, true, false) //更新上架,重载脚本
- Allspiders.Store(k, sp)
- up = true
- err = nil
- logger.Info("上架重载脚本", sp.Code)
- } else { //新增脚本
- sp, errstr := CreateSpider(k, v["script"], true, false)
- if errstr == "" && sp != nil && sp.Code != "nil" {
- Allspiders.Store(k, sp)
- sp.StartJob()
- up = true
- err = nil
- logger.Info("上架新增脚本", sp.Code)
- } else {
- err = errors.New("新增失败")
- nowT := time.Now().Unix()
- MgoS.Update("spider_loadfail",
- map[string]interface{}{
- "code": k,
- "modifytime": map[string]interface{}{
- "$gte": nowT - 12*3600,
- "$lte": nowT + 12*3600,
- },
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "code": k,
- "type": "新增初始化脚本",
- "script": v["script"],
- "updatetime": nowT,
- "modifyuser": sp.MUserName,
- "event": util.Config.Uploadevent,
- "err": errstr,
- },
- }, true, false)
- }
- }
- //2、Allspiders2对应7100、7110、7400上架采集三级页数据(Allspiders2三级页爬虫集合)
- if util.Config.Modal == 1 && !util.Config.IsHistoryEvent {
- //Allspiders2
- if spd2, ok2 := Allspiders2.Load(k); ok2 { //对应脚本已存在,更新
- sp2 := spd2.(*Spider)
- sp2.ScriptFile = v["script"]
- UpdateSpider(sp2, k, v["script"]) //爬虫其他信息更新
- sp2.LoadScript(&sp2.Name, &sp2.Channel, &sp2.MUserName, k, sp2.ScriptFile, true, false) //更新上架,重载脚本
- Allspiders2.Store(k, sp2) //重载后放入集合
- // up = true
- // err = nil
- logger.Info("Allspiders2上架重载脚本", sp2.Code)
- } else { //新增脚本
- sp2, errstr := CreateSpider(k, v["script"], true, false)
- if errstr == "" && sp2 != nil && sp2.Code != "nil" {
- sp2.IsMainThread = true //多线程采集详情页时使用
- go sp2.DownloadHighDetail(true) //根据列表页数据下载三级页
- Allspiders2.Store(k, sp2)
- // up = true
- // err = nil
- logger.Info("Allspiders2上架新增脚本", sp2.Code)
- } /*else {
- err = errors.New("新增失败")
- mgu.Save("spider_loadfail", "spider", "spider", map[string]interface{}{
- "code": k,
- "type": "新增脚本失败",
- "script": v["script"],
- "intime": time.Now().Format(qu.Date_Full_Layout),
- "event": util.Config.Uploadevent,
- })
- }*/
- }
- }
- }
- }
- }
- logger.Info("上下架:", up, err)
- return up, err
- }
- // 定时重载脚本文件
- func ReloadSpiderFile() {
- scriptMap := getSpiderScriptFile(true)
- for k, v := range scriptMap {
- for i, as := range []sync.Map{Allspiders, Allspiders2} {
- if i == 1 && util.Config.Modal == 0 { //队列模式原始模式采集Allspiders2无用
- continue
- }
- if spd, ok := as.Load(k); ok { //对应脚本已存在,更新
- sp := spd.(*Spider)
- logger.Info("定时重载脚本", sp.Code)
- sp.ScriptFile = v["script"]
- sp.MUserName = v["modifyuser"]
- sp.MUserEmail = v["modifyemail"]
- as.Store(k, sp)
- } else { //新增脚本
- var sp *Spider
- var errstr string
- if util.Config.Working == 1 { //排队模式
- if i == 0 {
- //length := 0
- //LoopListPath.Range(func(k, v interface{}) bool {
- // length++
- // return true
- //})
- LoopListPath.Store(k, v) //排队模式Allspiders,Allspiders2共用一个LoopListPath,新增一次即可
- sp, errstr = CreateSpider(k, v["script"], false, false)
- } else {
- sp, errstr = CreateSpider(k, v["script"], true, false)
- }
- } else {
- sp, errstr = CreateSpider(k, v["script"], true, false)
- }
- if errstr == "" && sp != nil && sp.Code != "nil" {
- sp.MUserName = v["modifyuser"]
- sp.MUserEmail = v["modifyemail"]
- as.Store(k, sp)
- if util.Config.Working == 1 {
- sp.Stop = true
- // if i == 0 {
- // length := 0
- // LoopListPath.Range(func(k, v interface{}) bool {
- // length++
- // return true
- // })
- // LoopListPath.Store(length, v)
- // }
- } else {
- sp.Stop = false
- if i == 0 { //高性能模式只有Allspiders启动爬虫,Allspiders2只负责下三级页
- sp.StartJob()
- }
- }
- logger.Info("定时重载脚本--新增", sp.Code)
- } else {
- if i == 0 {
- nowT := time.Now().Unix()
- MgoS.Update("spider_loadfail",
- map[string]interface{}{
- "code": k,
- "modifytime": map[string]interface{}{
- "$gte": nowT - 12*3600,
- "$lte": nowT + 12*3600,
- },
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "code": k,
- "type": "定时重载--新增失败",
- "script": v["script"],
- "updatetime": nowT,
- "modifyuser": sp.MUserName,
- "event": util.Config.Uploadevent,
- "err": errstr,
- },
- }, true, false)
- }
- }
- }
- }
- // if spd, ok := Allspiders.Load(k); ok { //对应脚本已存在,更新
- // sp := spd.(*Spider)
- // logger.Info("定时重载脚本", sp.Code)
- // sp.ScriptFile = v["script"]
- // if v["createuser"] != "" {
- // sp.UserName = v["createuser"]
- // }
- // if v["createuseremail"] != "" {
- // sp.UserEmail = v["createuseremail"]
- // }
- // sp.MUserName = v["modifyuser"]
- // sp.MUserEmail = v["modifyemail"]
- // Allspiders.Store(k, sp)
- // } else { //新增脚本
- // var sp *Spider
- // if util.Config.Working == 1 { //排队模式
- // length := 0
- // LoopListPath.Range(func(k, v interface{}) bool {
- // length++
- // return true
- // })
- // LoopListPath.Store(length, v)
- // sp = CreateSpider(k, v["script"], false,false)
- // } else {
- // sp = NewSpider(k, v["script"])
- // }
- // if sp != nil && sp.Code != "nil" {
- // if v["createuser"] != "" {
- // sp.UserName = v["createuser"]
- // }
- // if v["createuseremail"] != "" {
- // sp.UserEmail = v["createuseremail"]
- // }
- // sp.MUserName = v["modifyuser"]
- // sp.MUserEmail = v["modifyemail"]
- // Allspiders.Store(k, sp)
- // if util.Config.Working == 1 {
- // sp.Stop = true
- // length := 0
- // LoopListPath.Range(func(k, v interface{}) bool {
- // length++
- // return true
- // })
- // LoopListPath.Store(length, v)
- // } else {
- // sp.Stop = false
- // sp.StartJob()
- // }
- // logger.Info("定时重载脚本--新增", sp.Code)
- // } else {
- // mgu.Save("spider_loadfail", "spider", "spider", map[string]interface{}{
- // "code": k,
- // "type": "定时重载--新增失败",
- // "script": v["script"],
- // "intime": time.Now().Format(qu.Date_Full_Layout),
- // "event": util.Config.Uploadevent,
- // })
- // }
- // }
- }
- util.TimeAfterFunc(time.Duration(15)*time.Minute, ReloadSpiderFile, TimeChan)
- }
- // 生成爬虫
- func CreateSpider(code, luafile string, newstate, thread bool) (*Spider, string) {
- defer qu.Catch()
- spider := &Spider{}
- err := spider.LoadScript(&spider.Name, &spider.Channel, &spider.MUserName, code, luafile, newstate, thread)
- if err != "" {
- return nil, err
- }
- spider.Code = spider.GetVar("spiderCode")
- spider.SCode = spider.Code
- spider.Name = spider.GetVar("spiderName")
- spider.Channel = spider.GetVar("spiderChannel")
- //spider.LastExecTime = GetLastExectime(spider.Code)
- spider.DownDetail = spider.GetBoolVar("spiderDownDetailPage")
- spider.Collection = spider.GetVar("spider2Collection")
- spider.SpiderRunRate = int64(spider.GetIntVar("spiderRunRate"))
- //spider.Thread = int64(spider.GetIntVar("spiderThread"))
- spider.StoreToMsgEvent = spider.GetIntVar("spiderStoreToMsgEvent")
- spider.StoreMode = spider.GetIntVar("spiderStoreMode")
- spider.CoverAttr = spider.GetVar("spiderCoverAttr")
- spiderSleepBase := spider.GetIntVar("spiderSleepBase")
- if spiderSleepBase == -1 {
- spider.SleepBase = 1000
- } else {
- spider.SleepBase = spiderSleepBase
- }
- spiderSleepRand := spider.GetIntVar("spiderSleepRand")
- if spiderSleepRand == -1 {
- spider.SleepRand = 1000
- } else {
- spider.SleepRand = spiderSleepRand
- }
- spiderTimeout := spider.GetIntVar("spiderTimeout")
- if spiderTimeout == -1 {
- spider.Timeout = 60
- } else {
- spider.Timeout = int64(spiderTimeout)
- }
- spider.TargetChannelUrl = spider.GetVar("spiderTargetChannelUrl")
- //spider.UserName = spider.GetVar("spiderUserName")
- //spider.UserEmail = spider.GetVar("spiderUserEmail")
- //spider.UploadTime = spider.GetVar("spiderUploadTime")
- spider.MUserName = spider.GetVar("spiderUserName")
- spider.MUserEmail = spider.GetVar("spiderUserEmail")
- //新增历史补漏
- //qu.Debug("-------", spider.GetBoolVar("spiderIsHistoricalMend"), spider.GetBoolVar("spiderIsMustDownload"))
- spider.IsHistoricalMend = spider.GetBoolVar("spiderIsHistoricalMend")
- spider.IsMustDownload = spider.GetBoolVar("spiderIsMustDownload")
- //新老爬虫
- spider.IsCompete = spider.GetBoolVar("spiderIsCompete")
- //爬虫类型
- spider.Infoformat = spider.GetIntVar("spiderInfoformat")
- return spider, ""
- }
- // 更新爬虫
- func UpdateSpider(spider *Spider, code, script string) {
- ts := &Spider{}
- ts.Script.L = lua.NewState(lua.Options{
- RegistrySize: 256 * 20,
- CallStackSize: 256,
- IncludeGoStackTrace: false,
- })
- defer ts.L.Close()
- ts.Script.L.PreloadModule("http", gluahttp.NewHttpModule(&http.Client{}).Loader)
- ts.Script.L.PreloadModule("json", lujson.Loader)
- if err := ts.Script.L.DoString(script); err != nil {
- logger.Debug(code + ",加载lua脚本错误:" + err.Error())
- return
- }
- spider.Channel = ts.GetVar("spiderChannel") //栏目名称
- spider.DownDetail = ts.GetBoolVar("spiderDownDetailPage") //是否下三级页
- spider.Collection = ts.GetVar("spider2Collection") //存储表
- spider.SpiderRunRate = int64(ts.GetIntVar("spiderRunRate")) //间隔时间
- spider.StoreToMsgEvent = ts.GetIntVar("spiderStoreToMsgEvent") //4002
- spider.StoreMode = ts.GetIntVar("spiderStoreMode") //2
- spider.CoverAttr = ts.GetVar("spiderCoverAttr") //title
- //下载三级页(DownloadDetailPage)随机延迟
- spiderSleepBase := ts.GetIntVar("spiderSleepBase")
- if spiderSleepBase == -1 {
- spider.SleepBase = 1000
- } else {
- spider.SleepBase = spiderSleepBase
- }
- spiderSleepRand := ts.GetIntVar("spiderSleepRand")
- if spiderSleepRand == -1 {
- spider.SleepRand = 1000
- } else {
- spider.SleepRand = spiderSleepRand
- }
- spiderTimeout := ts.GetIntVar("spiderTimeout")
- if spiderTimeout == -1 {
- spider.Timeout = 60
- } else {
- spider.Timeout = int64(spiderTimeout)
- }
- spider.MUserName = spider.GetVar("spiderUserName")
- spider.MUserEmail = spider.GetVar("spiderUserEmail")
- spider.TargetChannelUrl = ts.GetVar("spiderTargetChannelUrl") //栏目地址
- //新增历史补漏
- spider.IsHistoricalMend = ts.GetBoolVar("spiderIsHistoricalMend")
- spider.IsMustDownload = ts.GetBoolVar("spiderIsMustDownload")
- //新老爬虫
- spider.IsCompete = ts.GetBoolVar("spiderIsCompete")
- //爬虫类型
- spider.Infoformat = spider.GetIntVar("spiderInfoformat")
- }
- // 多线程生成爬虫
- func NewSpiderForThread(code, luafile string) (*Spider, string) {
- defer qu.Catch()
- spider := &Spider{}
- err := spider.LoadScript(&spider.Name, &spider.Channel, &spider.MUserName, code, luafile, true, true)
- if err != "" {
- return nil, err
- }
- spider.Code = spider.GetVar("spiderCode")
- spider.SCode = spider.Code
- spider.Script.SCode = spider.Code
- spider.Name = spider.GetVar("spiderName")
- spider.Channel = spider.GetVar("spiderChannel")
- //spider.LastExecTime = GetLastExectime(spider.Code)
- spider.DownDetail = spider.GetBoolVar("spiderDownDetailPage")
- spider.Collection = spider.GetVar("spider2Collection")
- spider.SpiderRunRate = int64(spider.GetIntVar("spiderRunRate"))
- //spider.Thread = int64(spider.GetIntVar("spiderThread"))
- spider.StoreToMsgEvent = spider.GetIntVar("spiderStoreToMsgEvent")
- spider.StoreMode = spider.GetIntVar("spiderStoreMode")
- spider.CoverAttr = spider.GetVar("spiderCoverAttr")
- spiderSleepBase := spider.GetIntVar("spiderSleepBase")
- if spiderSleepBase == -1 {
- spider.SleepBase = 1000
- } else {
- spider.SleepBase = spiderSleepBase
- }
- spiderSleepRand := spider.GetIntVar("spiderSleepRand")
- if spiderSleepRand == -1 {
- spider.SleepRand = 1000
- } else {
- spider.SleepRand = spiderSleepRand
- }
- spiderTimeout := spider.GetIntVar("spiderTimeout")
- if spiderTimeout == -1 {
- spider.Timeout = 60
- } else {
- spider.Timeout = int64(spiderTimeout)
- }
- spider.TargetChannelUrl = spider.GetVar("spiderTargetChannelUrl")
- //spider.UserName = spider.GetVar("spiderUserName")
- //spider.UserEmail = spider.GetVar("spiderUserEmail")
- //spider.UploadTime = spider.GetVar("spiderUploadTime")
- //新增历史补漏
- //qu.Debug("-------", spider.GetBoolVar("spiderIsHistoricalMend"), spider.GetBoolVar("spiderIsMustDownload"))
- spider.IsHistoricalMend = spider.GetBoolVar("spiderIsHistoricalMend")
- spider.IsMustDownload = spider.GetBoolVar("spiderIsMustDownload")
- //新老爬虫
- spider.IsCompete = spider.GetBoolVar("spiderIsCompete")
- //爬虫类型
- spider.Infoformat = spider.GetIntVar("spiderInfoformat")
- return spider, ""
- }
- // 下载量入库
- func SaveDownCount(code string, addtotal bool, todayDowncount, todayRequestNum, yesterdayDowncount, yestoDayRequestNum int32) {
- date := time.Unix(time.Now().Unix(), 0).Format(qu.Date_Short_Layout)
- updata := map[string]interface{}{}
- if addtotal {
- updata = map[string]interface{}{
- "$inc": map[string]interface{}{"totaldown": todayDowncount, "totalreq": todayRequestNum},
- "$set": map[string]interface{}{
- "yesdowncount": yesterdayDowncount,
- "yesdownreq": yestoDayRequestNum,
- "todaydowncount": todayDowncount,
- "todaydownreq": todayRequestNum,
- "date": date,
- "year": time.Now().Year(),
- "month": time.Now().Month(),
- "day": time.Now().Day(),
- },
- }
- } else {
- updata = map[string]interface{}{
- "$set": map[string]interface{}{
- "yesdowncount": yesterdayDowncount,
- "yesdownreq": yestoDayRequestNum,
- "todaydowncount": todayDowncount,
- "todaydownreq": todayRequestNum,
- "date": date,
- "year": time.Now().Year(),
- "month": time.Now().Month(),
- "day": time.Now().Day(),
- },
- }
- }
- MgoS.Update("spider_downlog", map[string]interface{}{"code": code, "date": date}, updata, true, false)
- }
- // 获取下载的上下限(没用)
- func GetLimitDownload(code string) (uplimit, lowlimit int) {
- defer qu.Catch()
- ret, _ := MgoS.FindOne("spider_ldtime", map[string]interface{}{"code": code})
- if ret != nil && len(*ret) > 0 {
- uplimit = qu.IntAll((*ret)["uplimit"])
- lowlimit = qu.IntAll((*ret)["lowlimit"])
- return uplimit, lowlimit
- } else {
- return 100, 0
- }
- }
- // 拼装脚本
- func GetScriptByTmp(luaconfig map[string]interface{}) string {
- defer qu.Catch()
- script := ""
- if luaconfig["listcheck"] == nil {
- luaconfig["listcheck"] = ""
- }
- if luaconfig["contentcheck"] == nil {
- luaconfig["contentcheck"] = ""
- }
- modifyUser := qu.ObjToString(luaconfig["modifyuser"])
- modifyUserEmail := qu.ObjToString(luaconfig["createuseremail"])
- if luaconfig != nil && len(luaconfig) > 0 {
- common := luaconfig["param_common"].([]interface{})
- //新增spiderIsHistoricalMend spiderIsMustDownload
- if len(common) == 15 {
- common = append(common, modifyUser, modifyUserEmail, "")
- } else {
- common = append(common, false, false, modifyUser, modifyUserEmail, "")
- }
- for k, v := range common {
- if k == 4 || k == 5 || k == 6 || k == 9 || k == 10 {
- common[k] = qu.IntAll(v)
- }
- }
- script, _ = GetTmpModel(map[string][]interface{}{"common": common})
- //发布时间
- script_time := ""
- if qu.IntAll(luaconfig["type_time"]) == 0 { //向导模式
- time := luaconfig["param_time"].([]interface{})
- script_time, _ = GetTmpModel(map[string][]interface{}{
- "time": time,
- })
- } else { //专家模式
- script_time = luaconfig["str_time"].(string)
- }
- //列表页
- script_list := ""
- if qu.IntAll(luaconfig["type_list"]) == 0 { //向导模式
- list := luaconfig["param_list"].([]interface{})
- addrs := strings.Split(list[1].(string), "\n")
- if len(addrs) > 0 {
- for k, v := range addrs {
- addrs[k] = "'" + v + "'"
- }
- list[1] = strings.Join(addrs, ",")
- } else {
- list[1] = ""
- }
- script_list, _ = GetTmpModel(map[string][]interface{}{
- "list": list,
- "listcheck": []interface{}{luaconfig["listcheck"]},
- })
- } else { //专家模式
- script_list = luaconfig["str_list"].(string)
- }
- //三级页
- script_content := ""
- if qu.IntAll(luaconfig["type_content"]) == 0 { //向导模式
- content := luaconfig["param_content"].([]interface{})
- script_content, _ = GetTmpModel(map[string][]interface{}{
- "content": content,
- "contentcheck": []interface{}{luaconfig["contentcheck"]},
- })
- } else { //专家模式
- script_content = luaconfig["str_content"].(string)
- }
- script += fmt.Sprintf(util.Tmp_Other, luaconfig["spidertype"], luaconfig["spiderhistorymaxpage"], luaconfig["spidermovevent"], luaconfig["spidercompete"], luaconfig["infoformat"])
- script += `
- ` + script_time + `
- ` + script_list + `
- ` + script_content
- script = ReplaceModel(script, common, luaconfig["model"].(map[string]interface{}))
- }
- return script
- }
- // 生成爬虫脚本
- func GetTmpModel(param map[string][]interface{}) (script string, err interface{}) {
- qu.Try(func() {
- //param_common拼接
- if param != nil && param["common"] != nil {
- if len(param["common"]) < 12 {
- err = "公共参数配置不全"
- } else {
- script = fmt.Sprintf(util.Tmp_common, param["common"]...)
- }
- }
- //发布时间拼接
- if param != nil && param["time"] != nil {
- if len(param["time"]) < 3 {
- err = "方法:time-参数配置不全"
- } else {
- script += fmt.Sprintf(util.Tmp_pubtime, param["time"]...)
- }
- }
- //列表页拼接
- if param != nil && param["list"] != nil {
- if len(param["list"]) < 7 {
- err = "方法:list-参数配置不全"
- } else {
- list := []interface{}{param["listcheck"][0]}
- list = append(list, param["list"]...)
- script += fmt.Sprintf(util.Tmp_pagelist, list...)
- script = strings.Replace(script, "#pageno#", `"..tostring(pageno).."`, -1)
- }
- }
- //详情页拼接
- if param != nil && param["content"] != nil {
- if len(param["content"]) < 2 {
- err = "方法:content-参数配置不全"
- } else {
- content := []interface{}{param["contentcheck"][0]}
- content = append(content, param["content"]...)
- script += fmt.Sprintf(util.Tmp_content, content...)
- }
- }
- }, func(e interface{}) {
- err = e
- })
- return script, err
- }
- // 补充模型
- func ReplaceModel(script string, comm []interface{}, model map[string]interface{}) string {
- defer qu.Catch()
- //补充通用信息
- commstr := `item["spidercode"]="` + comm[0].(string) + `";`
- commstr += `item["site"]="` + comm[1].(string) + `";`
- commstr += `item["channel"]="` + comm[2].(string) + `";`
- script = strings.Replace(script, "--Common--", commstr, -1)
- //补充模型信息
- modelstr := ""
- for k, v := range model {
- modelstr += `item["` + k + `"]="` + v.(string) + `";`
- }
- script = strings.Replace(script, "--Model--", modelstr, -1)
- return script
- }
- // 爬虫信息提交编辑器(心跳)
- func SpiderInfoSend() {
- time.Sleep(15 * time.Second)
- list := []interface{}{}
- Allspiders.Range(func(key, value interface{}) bool {
- v := value.(*Spider)
- info := map[string]interface{}{}
- info["code"] = v.Code
- info["todayDowncount"] = v.TodayDowncount
- info["toDayRequestNum"] = v.ToDayRequestNum
- info["yesterdayDowncount"] = v.YesterdayDowncount
- info["yestoDayRequestNum"] = v.YestoDayRequestNum
- info["totalDowncount"] = v.TotalDowncount
- info["totalRequestNum"] = v.TotalRequestNum
- info["errorNum"] = v.ErrorNum
- info["roundCount"] = v.RoundCount
- info["runRate"] = v.SpiderRunRate
- info["lastHeartbeat"] = v.LastHeartbeat
- info["lastDowncount"] = v.LastDowncount
- info["lstate"] = v.L.Status(v.L)
- list = append(list, info)
- return true
- })
- bs, _ := json.Marshal(list)
- value := url.Values{
- "data": []string{util.Se.EncodeString(string(bs))},
- "type": []string{"info"},
- }
- _, err := http.PostForm(util.Config.Editoraddr, value)
- if err != nil {
- logger.Error("send to editor: ", err.Error())
- }
- util.TimeAfterFunc(5*time.Minute, SpiderInfoSend, TimeChan)
- }
- // 保存心跳信息
- func SaveHeartInfo() {
- time.Sleep(20 * time.Minute)
- num := 0
- SpiderHeart.Range(func(key, value interface{}) bool {
- code := key.(string)
- sp, spiderOk := LoopListPath.Load(code)
- if spiderOk && sp != nil {
- heart, heartOk := value.(*Heart)
- if heartOk {
- num++
- update := []map[string]interface{}{}
- update = append(update, map[string]interface{}{"code": code})
- update = append(update, map[string]interface{}{"$set": map[string]interface{}{
- "site": heart.Site,
- "channel": heart.Channel,
- "firstpage": heart.FirstPageHeart,
- "list": heart.ListHeart,
- "findlist": heart.FindListHeart,
- "detail": heart.DetailHeart,
- "detailexecute": heart.DetailExecuteHeart,
- "modifyuser": heart.ModifyUser,
- "event": util.Config.Uploadevent,
- "updatetime": time.Now().Unix(),
- "del": false,
- }})
- UpdataHeartCache <- update
- }
- } else {
- SpiderHeart.Delete(key)
- }
- return true
- })
- logger.Info("更新心跳个数:", num)
- time.AfterFunc(1*time.Second, SaveHeartInfo)
- }
- // 保存7000节点爬虫转增量节点日志
- func SpiderCodeSendToEditor(code string) {
- defer qu.Catch()
- MgoEB.Save("luamovelog", map[string]interface{}{
- "code": code,
- "comeintime": time.Now().Unix(),
- "ok": false,
- })
- //ok := false
- //for i := 1; i <= 3; i++ {
- // logger.Info("Code:", code, " times:", i, " Send Move Event")
- // list := []interface{}{}
- // list = append(list, code)
- // bs, _ := json.Marshal(list)
- // value := url.Values{
- // "data": []string{util.Se.EncodeString(string(bs))},
- // "type": []string{"code"},
- // }
- // res, err := http.PostForm(util.Config.Editoraddr, value)
- // if err != nil {
- // logger.Error("Send To Editor For Move Event Failed,Code:", code)
- // } else {
- // if res != nil {
- // res.Body.Close()
- // }
- // ok = true
- // break
- // }
- //}
- //logger.Info("Code:", code, " Send Move Event:", ok)
- //MgoEB.Save("luamovelog", map[string]interface{}{
- // "code": code,
- // "comeintime": time.Now().Unix(),
- // "type": "sendfail",
- // "ok": ok,
- //})
- }
|