123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- /**
- *批量作业任务调度
- */
- package vm
- import (
- "container/list"
- "encoding/json"
- "errors"
- "fmt"
- "log"
- be "spider_creator/backend"
- bdb "spider_creator/backend/db"
- "strconv"
- "time"
- "github.com/xuri/excelize/v2"
- "github.com/chromedp/chromedp"
- )
- var (
- // 调度中心,本地运行,只加不减,
- // 同时运行的作业没几个,这里不删除元素,不加锁
- runningJobs = map[string]*be.JobRunningState{}
- )
- // 异步执行指定job,只支持单线程
- func (vm *VM) RunJob(code string) {
- // TODO 1 加载配置
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "加载作业"})
- job, err := bdb.LoadEntity[be.Job]("jobs", code)
- var state *be.JobRunningState
- if err != nil {
- log.Println(err.Error())
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "执行作业失败:" + err.Error()})
- return
- }
- if v, ok := runningJobs[code]; ok {
- v.Progress = 1
- v.State = 1
- v.ExitCh = make(chan bool, 1)
- v.ResultCache.Init()
- state = v
- } else {
- state = &be.JobRunningState{
- Code: code,
- State: 1,
- Progress: 1,
- ResultCache: new(list.List),
- ExitCh: make(chan bool, 1),
- }
- runningJobs[code] = state
- }
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "加载作业完成"})
- no := 1
- //加载参数
- _, baseCancelFn, _, _, ctx, incCancelFn := be.NewBrowser(true, false, "") //列表页使用
- _, baseCancelFn2, _, _, ctx2, incCancelFn2 := be.NewBrowser(true, false, "") //详情页使用
- defer func() {
- job.State = 0
- job.Progress = 0
- incCancelFn2()
- baseCancelFn2()
- incCancelFn()
- baseCancelFn()
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: fmt.Sprintf("作业执行结束,结果:%d", state.ResultCache.Len())})
- close(state.ExitCh)
- }()
- log.Println("共有子爬虫数量:", len(job.Items))
- var totalPages, downloadedPages float32 = 0, 0
- for _, item := range job.Items {
- totalPages += float32(item.MaxPages)
- }
- //TODO 2. 主体结构
- L:
- for _, item := range job.Items {
- //单个任务爬取
- // TODO 加载单个爬虫采集配置
- sf, err := bdb.LoadEntity[be.SpiderConfig]("myBucket", item.SpiderCode)
- if err != nil {
- log.Println("加载爬虫配置参数失败:", err.Error())
- continue
- }
- log.Println(*sf)
- log.Println(*item)
- listRunJs, contentRunJs := sf.ListJSCode, sf.ContentJSCode
- //TODO 2. 执行JS代码,获取列表页信息
- if listRunJs == "" {
- listRunJs = renderJavascriptCoder(loadListItemsJS, sf)
- }
- if contentRunJs == "" {
- contentRunJs = renderJavascriptCoder(loadContentJS, sf)
- }
- listResult := make(be.ResultItems, 0)
- //TODO 3.打开列表,获取条目清单
- chromedp.Run(ctx, chromedp.Tasks{
- chromedp.Navigate(item.Url),
- chromedp.WaitReady("document.body", chromedp.ByJSPath),
- chromedp.Sleep(time.Duration(item.ListDelay) * time.Millisecond),
- })
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
- Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "打开列表页完成"})
- //TODO 4.支持打开多页
- for j := 0; j < item.MaxPages; j++ {
- downloadedPages += 1
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
- Act: be.JOB_RUNNING_EVENT_PROGRESS,
- Progress: int(downloadedPages / totalPages * 100)})
- err := chromedp.Run(ctx, chromedp.Tasks{
- chromedp.Evaluate(listRunJs, &listResult),
- })
- if err != nil {
- log.Println("执行JS代码失败", err.Error())
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
- Act: be.JOB_RUNNING_EVENT_DEBUG,
- Msg: "执行列表页JS代码失败"})
- continue
- }
- log.Println("加载当前列表页,长度:", len(listResult))
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
- Act: be.JOB_RUNNING_EVENT_DEBUG,
- Msg: "获取列表完成"})
- //TODO 5.操作详情页
- for _, r := range listResult {
- log.Println("详情页", r.Title, r.Href)
- select {
- case <-state.ExitCh:
- break L
- default:
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
- Act: be.JOB_RUNNING_EVENT_DEBUG,
- Msg: fmt.Sprintf("打开详情页%d %s", no, r.ListTitle)})
- //打开详情页
- err = chromedp.Run(ctx2, chromedp.Tasks{
- chromedp.Navigate(r.Href),
- chromedp.WaitReady("document.body", chromedp.ByJSPath),
- chromedp.Sleep(time.Duration(item.ContentDelay) * time.Millisecond),
- })
- if err != nil {
- continue
- }
- //获取详情页内容
- err = chromedp.Run(ctx2, chromedp.Tasks{
- chromedp.Evaluate(contentRunJs, r),
- })
- if err != nil {
- continue
- }
- if item.NeedDownloadAttaches {
- downloadAttaches(r, vm.attachesDir)
- }
- //补齐其他字段
- r.No = no
- no += 1
- r.Site = item.SpiderSite
- r.Channel = item.Channel
- //结果放入缓存
- state.ResultCache.PushBack(r)
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
- Act: be.JOB_RUNNING_EVENT_DEBUG,
- Msg: fmt.Sprintf("下载详情页%d %s", no, r.Href)})
- }
- }
- //TODO 6.翻页
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
- Act: be.JOB_RUNNING_EVENT_DEBUG,
- Msg: "准备翻页"})
- if err = trunPage(sf, item.TrunPageDelay, ctx); err != nil {
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
- Act: be.JOB_RUNNING_EVENT_DEBUG,
- Msg: "执行翻页代码失败"})
- break
- }
- }
- }
- vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
- Act: be.JOB_RUNNING_EVENT_PROGRESS,
- Progress: 100})
- }
- // StopJob
- func (vm *VM) StopJob(code string) {
- defer func() {
- if err := recover(); err != nil {
- log.Println(err)
- }
- }()
- if v, ok := runningJobs[code]; ok {
- v.ExitCh <- true
- v.State = 0
- v.Progress = 0
- }
- }
- // ExportJobResult
- func (vm *VM) ExportJobResult(code, filePath string) error {
- if job, ok := runningJobs[code]; ok {
- f := excelize.NewFile()
- defer f.Close()
- f.SetCellStr("Sheet1", "A1", "站点")
- f.SetCellStr("Sheet1", "B1", "栏目")
- //写入数据
- f.SetCellStr("Sheet1", "C1", "标题")
- f.SetCellStr("Sheet1", "D1", "链接")
- f.SetCellStr("Sheet1", "E1", "发布单位")
- f.SetCellStr("Sheet1", "F1", "发布时间")
- f.SetCellStr("Sheet1", "G1", "正文")
- f.SetCellStr("Sheet1", "H1", "附件")
- i := 0
- for el := job.ResultCache.Front(); el != nil; el = el.Next() {
- r, _ := el.Value.(*be.ResultItem)
- //写入站点信息
- iStr := strconv.Itoa(i + 2)
- f.SetCellStr("Sheet1", "A"+iStr, r.Site)
- f.SetCellStr("Sheet1", "B"+iStr, r.Channel)
- //写入数据
- f.SetCellStr("Sheet1", "C"+iStr, r.Title)
- f.SetCellStr("Sheet1", "D"+iStr, r.Href)
- f.SetCellStr("Sheet1", "E"+iStr, r.PublishUnit)
- f.SetCellStr("Sheet1", "F"+iStr, r.ListPubTime)
- f.SetCellStr("Sheet1", "G"+iStr, r.Content)
- f.SetCellStr("Sheet1", "H"+iStr, "")
- if len(r.AttachLinks) > 0 {
- bs, err := json.Marshal(r.AttachLinks)
- if err == nil {
- f.SetCellStr("Sheet1", "H"+iStr, string(bs))
- }
- }
- i += 1
- }
- err := f.SaveAs(filePath)
- if err != nil {
- return err
- }
- return nil
- }
- return errors.New("找不到正在该作业")
- }
|