package vm import ( "container/list" "fmt" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" be "spider_creator/backend" "sync" "time" "github.com/chromedp/chromedp" ) // 销毁 func (w *Worker) Destory() { if w.incCancel != nil { w.incCancel() } if w.baseCancel != nil { w.baseCancel() } } // NewWorker func NewWorker(headless bool, showImage bool, proxyServe string, contentDelay int64, js string, vm *VM) *Worker { _, baseCancel, _, _, ctx, cancel := be.NewBrowser(headless, showImage, proxyServe) return &Worker{baseCancel: baseCancel, incCancel: cancel, ctx: ctx, js: js, contentDelay: contentDelay, vm: vm, } } // 执行作业 func (w *Worker) Run(v *be.ResultItem, ch chan *Worker, wg *sync.WaitGroup) { defer func() { ch <- w wg.Done() }() w.vm.dnf.Dispatch("debug_event", fmt.Sprintf("4. %d- 待 下载详情页 %s ", v.No, v.ListTitle)) var result string = "" err := chromedp.Run(w.ctx, chromedp.Tasks{ chromedp.Navigate(v.Href), chromedp.WaitReady(`document.body`, chromedp.ByJSPath), chromedp.Sleep(time.Duration(w.contentDelay) * time.Millisecond), chromedp.Evaluate(w.js, v), }) if err != nil { qu.Debug("执行JS代码失败_详情", err.Error()) } if len(v.AttachLinks) > 0 { //有附件 w.vm.dnf.Dispatch("debug_event", fmt.Sprintf("4. 下载附件")) //TODO 下载附件 downloadAttaches(v, w.vm.attachesDir) } //关闭当前TAB页 chromedp.Run(w.ctx, chromedp.Tasks{ chromedp.Evaluate(`var ret="";window.close();ret`, &result), }) w.vm.dnf.Dispatch("debug_event", fmt.Sprintf("4. %d- 下载详情页 %s 完成", v.No, v.Title)) } // RunSpiderMulThreads func (vm *VM) RunSpiderMulThreads(url string, maxPages int, listDealy int64, trunPageDelay int64, contentDelay int64, headless bool, showImage bool, proxyServe string, threads int, exit chan bool, cssMark map[string]interface{}) { //sc := be.MergeSpiderConfig(currentSpiderConfig, &be.SpiderConfig{Href: url}) sc, err := be.NewSpiderConfig(cssMark) if err != nil { qu.Debug("标注信息传输失败!") vm.dnf.Dispatch("debug_event", "标注信息传输失败!") return } if url != "" { sc.Href = url } _, baseCancel, _, _, ctx, cancel := be.NewBrowser(headless, showImage, proxyServe) qu.Debug("1浏览器打开") vm.dnf.Dispatch("debug_event", "1 浏览器打开") defer func() { cancel() baseCancel() qu.Debug("0浏览器已经销毁") vm.dnf.Dispatch("debug_event", "0 浏览器已经销毁") close(exit) }() var runListJs, runContentJs string = sc.ListJSCode, sc.ContentJSCode if runListJs == "" { runListJs = renderJavascriptCoder(loadListItemsJS, sc) } if runContentJs == "" { runContentJs = renderJavascriptCoder(loadContentJS, sc) } qu.Debug("获取列表JS代码", runListJs) wts := make([]*Worker, threads) ch := make(chan *Worker, threads) wg := new(sync.WaitGroup) for i := 0; i < threads; i++ { w := NewWorker(headless, showImage, proxyServe, contentDelay, runContentJs, vm) wts = append(wts, w) ch <- w } //批量销毁 defer func() { for _, w := range wts { if w != nil { w.Destory() } } }() no := 1 //TODO 1.翻页操作,需要在外层打开列表页 chromedp.Run(ctx, chromedp.Tasks{ chromedp.Navigate(sc.Href), chromedp.WaitReady("document.body", chromedp.ByJSPath), chromedp.Sleep(time.Duration(listDealy) * time.Millisecond), }) vm.dnf.Dispatch("debug_event", "2 页面已经打开") qu.Debug("2页面打开") currentResult := list.New() be.DataResults[sc.Code] = currentResult for i := 0; i < maxPages; i++ { listResult := make(be.ResultItems, 0) //TODO 2. 执行JS代码,获取列表页信息 err := chromedp.Run(ctx, chromedp.Tasks{ chromedp.Evaluate(runListJs, &listResult), }) if err != nil { qu.Debug("执行JS代码失败_列表", err.Error()) vm.dnf.Dispatch("debug_event", "2 列表-执行JS代码失败") return } vm.dnf.Dispatch("debug_event", "3 获取列表完成") qu.Debug("3获取列表完成") //TODO 3. 打开详情页 ,支持多线程 for _, v := range listResult { select { case <-exit: return default: w := <-ch wg.Add(1) no += 1 v.No = no v.Site = sc.Site v.Channel = sc.Channel currentResult.PushBack(v) go w.Run(v, ch, wg) } } wg.Wait() vm.dnf.Dispatch("debug_event", "4 当前页采集完成,准备执行翻页逻辑//"+sc.ListNextPageCss) if err = trunPage(sc, trunPageDelay, ctx); err != nil { qu.Debug("翻页失败", err.Error()) vm.dnf.Dispatch("debug_event", "6 翻页失败: "+err.Error()) time.Sleep(3 * time.Second) break } } vm.dnf.Dispatch("debug_event", "6 采集测试完成") qu.Debug("6 采集测试完成") }