|
@@ -0,0 +1,244 @@
|
|
|
+/**
|
|
|
+*批量作业任务调度
|
|
|
+ */
|
|
|
+package vm
|
|
|
+
|
|
|
+import (
|
|
|
+ "container/list"
|
|
|
+ "encoding/json"
|
|
|
+ "errors"
|
|
|
+ "fmt"
|
|
|
+ "log"
|
|
|
+ be "spidercreator/backend"
|
|
|
+ bdb "spidercreator/backend/db"
|
|
|
+ "strconv"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/xuri/excelize/v2"
|
|
|
+
|
|
|
+ "github.com/chromedp/chromedp"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ // 调度中心,本地运行,只加不减,
|
|
|
+ // 同时运行的作业没几个,这里不删除元素,不加锁
|
|
|
+ runningJobs = map[string]*be.JobRunningState{}
|
|
|
+)
|
|
|
+
|
|
|
+// 异步执行指定job,只支持单线程
|
|
|
+func (vm *VM) RunJob(code string) {
|
|
|
+ // TODO 1 加载配置
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "加载作业"})
|
|
|
+ job, err := bdb.LoadEntity[be.Job]("jobs", code)
|
|
|
+ var state *be.JobRunningState
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err.Error())
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "执行作业失败:" + err.Error()})
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if v, ok := runningJobs[code]; ok {
|
|
|
+ v.Progress = 1
|
|
|
+ v.State = 1
|
|
|
+ v.ExitCh = make(chan bool, 1)
|
|
|
+ v.ResultCache.Init()
|
|
|
+ state = v
|
|
|
+ } else {
|
|
|
+ state = &be.JobRunningState{
|
|
|
+ Code: code,
|
|
|
+ State: 1,
|
|
|
+ Progress: 1,
|
|
|
+ ResultCache: new(list.List),
|
|
|
+ ExitCh: make(chan bool, 1),
|
|
|
+ }
|
|
|
+ runningJobs[code] = state
|
|
|
+ }
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "加载作业完成"})
|
|
|
+ no := 1
|
|
|
+ //加载参数
|
|
|
+ _, baseCancelFn, _, _, ctx, incCancelFn := be.NewBrowser(true, false, "") //列表页使用
|
|
|
+ _, baseCancelFn2, _, _, ctx2, incCancelFn2 := be.NewBrowser(true, false, "") //详情页使用
|
|
|
+ defer func() {
|
|
|
+ job.State = 0
|
|
|
+ job.Progress = 0
|
|
|
+ incCancelFn2()
|
|
|
+ baseCancelFn2()
|
|
|
+ incCancelFn()
|
|
|
+ baseCancelFn()
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: fmt.Sprintf("作业执行结束,结果:%d", state.ResultCache.Len())})
|
|
|
+ close(state.ExitCh)
|
|
|
+ }()
|
|
|
+ log.Println("共有子爬虫数量:", len(job.Items))
|
|
|
+ var totalPages, downloadedPages float32 = 0, 0
|
|
|
+ for _, item := range job.Items {
|
|
|
+ totalPages += float32(item.MaxPages)
|
|
|
+ }
|
|
|
+ //TODO 2. 主体结构
|
|
|
+L:
|
|
|
+ for _, item := range job.Items {
|
|
|
+ //单个任务爬取
|
|
|
+ // TODO 加载单个爬虫采集配置
|
|
|
+ sf, err := bdb.LoadEntity[be.SpiderConfig]("myBucket", item.SpiderCode)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("加载爬虫配置参数失败:", err.Error())
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ log.Println(*sf)
|
|
|
+ log.Println(*item)
|
|
|
+ listRunJs, contentRunJs := sf.ListJSCode, sf.ContentJSCode
|
|
|
+
|
|
|
+ //TODO 2. 执行JS代码,获取列表页信息
|
|
|
+ if listRunJs == "" {
|
|
|
+ listRunJs = renderJavascriptCoder(loadListItemsJS, sf)
|
|
|
+ }
|
|
|
+ if contentRunJs == "" {
|
|
|
+ contentRunJs = renderJavascriptCoder(loadContentJS, sf)
|
|
|
+ }
|
|
|
+
|
|
|
+ listResult := make(be.ResultItems, 0)
|
|
|
+ //TODO 3.打开列表,获取条目清单
|
|
|
+ chromedp.Run(ctx, chromedp.Tasks{
|
|
|
+ chromedp.Navigate(item.Url),
|
|
|
+ chromedp.WaitReady("document.body", chromedp.ByJSPath),
|
|
|
+ chromedp.Sleep(time.Duration(item.ListDelay) * time.Millisecond),
|
|
|
+ })
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
|
|
|
+ Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "打开列表页完成"})
|
|
|
+ //TODO 4.支持打开多页
|
|
|
+ for j := 0; j < item.MaxPages; j++ {
|
|
|
+ downloadedPages += 1
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
|
|
|
+ Act: be.JOB_RUNNING_EVENT_PROGRESS,
|
|
|
+ Progress: int(downloadedPages / totalPages * 100)})
|
|
|
+ err := chromedp.Run(ctx, chromedp.Tasks{
|
|
|
+ chromedp.Evaluate(listRunJs, &listResult),
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ log.Println("执行JS代码失败", err.Error())
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
|
|
|
+ Act: be.JOB_RUNNING_EVENT_DEBUG,
|
|
|
+ Msg: "执行列表页JS代码失败"})
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ log.Println("加载当前列表页,长度:", len(listResult))
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
|
|
|
+ Act: be.JOB_RUNNING_EVENT_DEBUG,
|
|
|
+ Msg: "获取列表完成"})
|
|
|
+ //TODO 5.操作详情页
|
|
|
+ for _, r := range listResult {
|
|
|
+ log.Println("详情页", r.Title, r.Href)
|
|
|
+ select {
|
|
|
+ case <-state.ExitCh:
|
|
|
+ break L
|
|
|
+ default:
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
|
|
|
+ Act: be.JOB_RUNNING_EVENT_DEBUG,
|
|
|
+ Msg: fmt.Sprintf("打开详情页%d %s", no, r.ListTitle)})
|
|
|
+ //打开详情页
|
|
|
+ err = chromedp.Run(ctx2, chromedp.Tasks{
|
|
|
+ chromedp.Navigate(r.Href),
|
|
|
+ chromedp.WaitReady("document.body", chromedp.ByJSPath),
|
|
|
+ chromedp.Sleep(time.Duration(item.ContentDelay) * time.Millisecond),
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //获取详情页内容
|
|
|
+ err = chromedp.Run(ctx2, chromedp.Tasks{
|
|
|
+ chromedp.Evaluate(contentRunJs, r),
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ if item.NeedDownloadAttaches {
|
|
|
+ downloadAttaches(r, vm.attachesDir)
|
|
|
+ }
|
|
|
+ //补齐其他字段
|
|
|
+ r.No = no
|
|
|
+ no += 1
|
|
|
+ r.Site = item.SpiderSite
|
|
|
+ r.Channel = item.Channel
|
|
|
+ //结果放入缓存
|
|
|
+ state.ResultCache.PushBack(r)
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
|
|
|
+ Act: be.JOB_RUNNING_EVENT_DEBUG,
|
|
|
+ Msg: fmt.Sprintf("下载详情页%d %s", no, r.Href)})
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //TODO 6.翻页
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
|
|
|
+ Act: be.JOB_RUNNING_EVENT_DEBUG,
|
|
|
+ Msg: "准备翻页"})
|
|
|
+ if err = trunPage(sf, item.TrunPageDelay, ctx); err != nil {
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
|
|
|
+ Act: be.JOB_RUNNING_EVENT_DEBUG,
|
|
|
+ Msg: "执行翻页代码失败"})
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
|
|
|
+ Act: be.JOB_RUNNING_EVENT_PROGRESS,
|
|
|
+ Progress: 100})
|
|
|
+}
|
|
|
+
|
|
|
+// StopJob
|
|
|
+func (vm *VM) StopJob(code string) {
|
|
|
+ defer func() {
|
|
|
+ if err := recover(); err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ if v, ok := runningJobs[code]; ok {
|
|
|
+ v.ExitCh <- true
|
|
|
+ v.State = 0
|
|
|
+ v.Progress = 0
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// ExportJobResult
|
|
|
+func (vm *VM) ExportJobResult(code, filePath string) error {
|
|
|
+ if job, ok := runningJobs[code]; ok {
|
|
|
+ f := excelize.NewFile()
|
|
|
+ defer f.Close()
|
|
|
+ f.SetCellStr("Sheet1", "A1", "站点")
|
|
|
+ f.SetCellStr("Sheet1", "B1", "栏目")
|
|
|
+ //写入数据
|
|
|
+ f.SetCellStr("Sheet1", "C1", "标题")
|
|
|
+ f.SetCellStr("Sheet1", "D1", "链接")
|
|
|
+ f.SetCellStr("Sheet1", "E1", "发布单位")
|
|
|
+ f.SetCellStr("Sheet1", "F1", "发布时间")
|
|
|
+ f.SetCellStr("Sheet1", "G1", "正文")
|
|
|
+ f.SetCellStr("Sheet1", "H1", "附件")
|
|
|
+ i := 0
|
|
|
+ for el := job.ResultCache.Front(); el != nil; el = el.Next() {
|
|
|
+ r, _ := el.Value.(*be.ResultItem)
|
|
|
+ //写入站点信息
|
|
|
+ iStr := strconv.Itoa(i + 2)
|
|
|
+ f.SetCellStr("Sheet1", "A"+iStr, r.Site)
|
|
|
+ f.SetCellStr("Sheet1", "B"+iStr, r.Channel)
|
|
|
+ //写入数据
|
|
|
+ f.SetCellStr("Sheet1", "C"+iStr, r.Title)
|
|
|
+ f.SetCellStr("Sheet1", "D"+iStr, r.Href)
|
|
|
+ f.SetCellStr("Sheet1", "E"+iStr, r.PublishUnit)
|
|
|
+ f.SetCellStr("Sheet1", "F"+iStr, r.ListPubTime)
|
|
|
+ f.SetCellStr("Sheet1", "G"+iStr, r.Content)
|
|
|
+ f.SetCellStr("Sheet1", "H"+iStr, "")
|
|
|
+ if len(r.AttachLinks) > 0 {
|
|
|
+ bs, err := json.Marshal(r.AttachLinks)
|
|
|
+ if err == nil {
|
|
|
+ f.SetCellStr("Sheet1", "H"+iStr, string(bs))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ i += 1
|
|
|
+ }
|
|
|
+ err := f.SaveAs(filePath)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return errors.New("找不到正在该作业")
|
|
|
+}
|