/** *批量作业任务调度 */ package vm import ( "container/list" "encoding/json" "errors" "fmt" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" be "spider_creator/backend" bdb "spider_creator/backend/db" "strconv" "time" "github.com/xuri/excelize/v2" "github.com/chromedp/chromedp" ) var ( // 调度中心,本地运行,只加不减, // 同时运行的作业没几个,这里不删除元素,不加锁 runningJobs = map[string]*be.JobRunningState{} ) // 异步执行指定job,只支持单线程 func (vm *VM) RunJob(code string) { // TODO 1 加载配置 vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "加载作业"}) job, err := bdb.LoadEntity[be.Job]("jobs", code) var state *be.JobRunningState if err != nil { qu.Debug(err.Error()) vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "执行作业失败:" + err.Error()}) return } if v, ok := runningJobs[code]; ok { v.Progress = 1 v.State = 1 v.ExitCh = make(chan bool, 1) v.ResultCache.Init() state = v } else { state = &be.JobRunningState{ Code: code, State: 1, Progress: 1, ResultCache: new(list.List), ExitCh: make(chan bool, 1), } runningJobs[code] = state } vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "加载作业完成"}) no := 1 //加载参数 _, baseCancelFn, _, _, ctx, incCancelFn := be.NewBrowser(true, false, "") //列表页使用 _, baseCancelFn2, _, _, ctx2, incCancelFn2 := be.NewBrowser(true, false, "") //详情页使用 defer func() { job.State = 0 job.Progress = 0 incCancelFn2() baseCancelFn2() incCancelFn() baseCancelFn() vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: fmt.Sprintf("作业执行结束,结果:%d", state.ResultCache.Len())}) close(state.ExitCh) }() qu.Debug("共有子爬虫数量:", len(job.Items)) var totalPages, downloadedPages float32 = 0, 0 for _, item := range job.Items { totalPages += float32(item.MaxPages) } //TODO 2. 主体结构 L: for _, item := range job.Items { //单个任务爬取 // TODO 加载单个爬虫采集配置 sf, err := bdb.LoadEntity[be.SpiderConfig]("myBucket", item.SpiderCode) if err != nil { qu.Debug("加载爬虫配置参数失败:", err.Error()) continue } qu.Debug(*sf) qu.Debug(*item) listRunJs, contentRunJs := sf.ListJSCode, sf.ContentJSCode //TODO 2. 执行JS代码,获取列表页信息 if listRunJs == "" { listRunJs = renderJavascriptCoder(loadListItemsJS, sf) } if contentRunJs == "" { contentRunJs = renderJavascriptCoder(loadContentJS, sf) } listResult := make(be.ResultItems, 0) //TODO 3.打开列表,获取条目清单 chromedp.Run(ctx, chromedp.Tasks{ chromedp.Navigate(item.Url), chromedp.WaitReady("document.body", chromedp.ByJSPath), chromedp.Sleep(time.Duration(item.ListDelay) * time.Millisecond), }) vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "打开列表页完成"}) //TODO 4.支持打开多页 for j := 0; j < item.MaxPages; j++ { downloadedPages += 1 vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_PROGRESS, Progress: int(downloadedPages / totalPages * 100)}) err := chromedp.Run(ctx, chromedp.Tasks{ chromedp.Evaluate(listRunJs, &listResult), }) if err != nil { qu.Debug("执行JS代码失败", err.Error()) vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "执行列表页JS代码失败"}) continue } qu.Debug("加载当前列表页,长度:", len(listResult)) vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "获取列表完成"}) //TODO 5.操作详情页 for _, r := range listResult { qu.Debug("详情页", r.Title, r.Href) select { case <-state.ExitCh: break L default: vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: fmt.Sprintf("打开详情页%d %s", no, r.ListTitle)}) //打开详情页 err = chromedp.Run(ctx2, chromedp.Tasks{ chromedp.Navigate(r.Href), chromedp.WaitReady("document.body", chromedp.ByJSPath), chromedp.Sleep(time.Duration(item.ContentDelay) * time.Millisecond), }) if err != nil { continue } //获取详情页内容 err = chromedp.Run(ctx2, chromedp.Tasks{ chromedp.Evaluate(contentRunJs, r), }) if err != nil { continue } if item.NeedDownloadAttaches { downloadAttaches(r, vm.attachesDir) } //补齐其他字段 r.No = no no += 1 r.Site = item.SpiderSite r.Channel = item.Channel //结果放入缓存 state.ResultCache.PushBack(r) vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: fmt.Sprintf("下载详情页%d %s", no, r.Href)}) } } //TODO 6.翻页 vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "准备翻页"}) if err = trunPage(sf, item.TrunPageDelay, ctx); err != nil { vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "执行翻页代码失败"}) break } } } vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_PROGRESS, Progress: 100}) } // StopJob func (vm *VM) StopJob(code string) { defer func() { if err := recover(); err != nil { qu.Debug(err) } }() if v, ok := runningJobs[code]; ok { v.ExitCh <- true v.State = 0 v.Progress = 0 } } // ExportJobResult func (vm *VM) ExportJobResult(code, filePath string) error { if job, ok := runningJobs[code]; ok { f := excelize.NewFile() defer f.Close() f.SetCellStr("Sheet1", "A1", "站点") f.SetCellStr("Sheet1", "B1", "栏目") //写入数据 f.SetCellStr("Sheet1", "C1", "标题") f.SetCellStr("Sheet1", "D1", "链接") f.SetCellStr("Sheet1", "E1", "发布单位") f.SetCellStr("Sheet1", "F1", "发布时间") f.SetCellStr("Sheet1", "G1", "正文") f.SetCellStr("Sheet1", "H1", "附件") i := 0 for el := job.ResultCache.Front(); el != nil; el = el.Next() { r, _ := el.Value.(*be.ResultItem) //写入站点信息 iStr := strconv.Itoa(i + 2) f.SetCellStr("Sheet1", "A"+iStr, r.Site) f.SetCellStr("Sheet1", "B"+iStr, r.Channel) //写入数据 f.SetCellStr("Sheet1", "C"+iStr, r.Title) f.SetCellStr("Sheet1", "D"+iStr, r.Href) f.SetCellStr("Sheet1", "E"+iStr, r.PublishUnit) f.SetCellStr("Sheet1", "F"+iStr, r.ListPubTime) f.SetCellStr("Sheet1", "G"+iStr, r.Content) f.SetCellStr("Sheet1", "H"+iStr, "") if len(r.AttachLinks) > 0 { bs, err := json.Marshal(r.AttachLinks) if err == nil { f.SetCellStr("Sheet1", "H"+iStr, string(bs)) } } i += 1 } err := f.SaveAs(filePath) if err != nil { return err } return nil } return errors.New("找不到正在该作业") }