worker.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package vm
  2. import (
  3. "container/list"
  4. "fmt"
  5. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. be "spider_creator/backend"
  7. "sync"
  8. "time"
  9. "github.com/chromedp/chromedp"
  10. )
  11. // 销毁
  12. func (w *Worker) Destory() {
  13. if w.incCancel != nil {
  14. w.incCancel()
  15. }
  16. if w.baseCancel != nil {
  17. w.baseCancel()
  18. }
  19. }
  20. // NewWorker
  21. func NewWorker(headless bool, showImage bool, proxyServe bool, contentDelay int64, js string, vm *VM, filterResource string) *Worker {
  22. _, baseCancel, _, _, ctx, cancel := be.NewBrowser(headless, showImage, proxyServe, "https://", filterResource)
  23. return &Worker{
  24. baseCancel: baseCancel,
  25. incCancel: cancel,
  26. ctx: ctx,
  27. js: js,
  28. contentDelay: contentDelay,
  29. vm: vm,
  30. }
  31. }
  32. // 执行作业
  33. func (w *Worker) Run(v *be.ResultItem, ch chan *Worker, wg *sync.WaitGroup) {
  34. defer func() {
  35. ch <- w
  36. wg.Done()
  37. }()
  38. w.vm.dnf.Dispatch("debug_event", fmt.Sprintf("4. %d- 待 下载详情页 %s ", v.No, v.ListTitle))
  39. var result string = ""
  40. err := chromedp.Run(w.ctx, chromedp.Tasks{
  41. chromedp.Navigate(v.Href),
  42. chromedp.WaitReady(`document.body`, chromedp.ByJSPath),
  43. chromedp.Sleep(time.Duration(w.contentDelay) * time.Millisecond),
  44. chromedp.Evaluate(w.js, v),
  45. })
  46. if err != nil {
  47. qu.Debug("执行JS代码失败_详情", err.Error())
  48. }
  49. if len(v.AttachLinks) > 0 { //有附件
  50. w.vm.dnf.Dispatch("debug_event", fmt.Sprintf("4. 下载附件"))
  51. //TODO 下载附件
  52. downloadAttaches(v, w.vm.attachesDir)
  53. }
  54. //关闭当前TAB页
  55. chromedp.Run(w.ctx, chromedp.Tasks{
  56. chromedp.Evaluate(`var ret="";window.close();ret`, &result),
  57. })
  58. w.vm.dnf.Dispatch("debug_event", fmt.Sprintf("4. %d- 下载详情页 %s 完成", v.No, v.Title))
  59. }
  60. // RunSpiderMulThreads
  61. func (vm *VM) RunSpiderMulThreads(url string, maxPages int, listDealy int64, trunPageDelay int64, contentDelay int64, headless bool, showImage bool, proxyServe bool, threads int, exit chan bool,
  62. cssMark map[string]interface{}) {
  63. //sc := be.MergeSpiderConfig(currentSpiderConfig, &be.SpiderConfig{Href: url})
  64. sc, err := be.NewSpiderConfig(cssMark)
  65. if err != nil {
  66. qu.Debug("标注信息传输失败!")
  67. vm.dnf.Dispatch("debug_event", "标注信息传输失败!")
  68. return
  69. }
  70. if url != "" {
  71. sc.Href = url
  72. }
  73. _, baseCancel, _, _, ctx, cancel := be.NewBrowser(headless, showImage, proxyServe, sc.Href, sc.FilterResource)
  74. qu.Debug("1浏览器打开")
  75. vm.dnf.Dispatch("debug_event", "1 浏览器打开")
  76. defer func() {
  77. cancel()
  78. baseCancel()
  79. qu.Debug("0浏览器已经销毁")
  80. vm.dnf.Dispatch("debug_event", "0 浏览器已经销毁")
  81. close(exit)
  82. }()
  83. var runListJs, runContentJs string = sc.ListJSCode, sc.ContentJSCode
  84. if be.RegSpace.ReplaceAllString(runListJs, "") == "" {
  85. runListJs = renderJavascriptCoder(loadListItemsJS, sc)
  86. }
  87. if be.RegSpace.ReplaceAllString(runContentJs, "") == "" {
  88. runContentJs = renderJavascriptCoder(loadContentJS, sc)
  89. }
  90. qu.Debug("获取列表页JS代码:", runListJs)
  91. qu.Debug("获取详情页JS代码:", runContentJs)
  92. wts := make([]*Worker, threads)
  93. ch := make(chan *Worker, threads)
  94. wg := new(sync.WaitGroup)
  95. for i := 0; i < threads; i++ {
  96. w := NewWorker(headless, showImage, proxyServe, contentDelay, runContentJs, vm, sc.FilterResource)
  97. wts = append(wts, w)
  98. ch <- w
  99. }
  100. //批量销毁
  101. defer func() {
  102. for _, w := range wts {
  103. if w != nil {
  104. w.Destory()
  105. }
  106. }
  107. }()
  108. no := 0
  109. //TODO 1.翻页操作,需要在外层打开列表页
  110. chromedp.Run(ctx, chromedp.Tasks{
  111. chromedp.Navigate(sc.Href),
  112. chromedp.WaitReady("document.body", chromedp.ByJSPath),
  113. chromedp.Sleep(time.Duration(listDealy) * time.Millisecond),
  114. })
  115. vm.dnf.Dispatch("debug_event", "2 页面已经打开")
  116. qu.Debug("2页面打开")
  117. vm.dnf.Dispatch("debug_event", "3 初始化列表页信息")
  118. if !vm.InitListPage(ctx, sc) {
  119. vm.dnf.Dispatch("debug_event", "3 初始化列表页失败,退出")
  120. return
  121. }
  122. currentResult := list.New()
  123. be.DataResults[sc.Code] = currentResult
  124. for i := 0; i < maxPages; i++ {
  125. listResult := make(be.ResultItems, 0)
  126. //TODO 2. 执行JS代码,获取列表页信息
  127. err := chromedp.Run(ctx, chromedp.Tasks{
  128. chromedp.Evaluate(runListJs, &listResult),
  129. })
  130. if err != nil {
  131. qu.Debug("执行JS代码失败_列表", err.Error())
  132. vm.dnf.Dispatch("debug_event", "2 列表-执行JS代码失败")
  133. return
  134. }
  135. vm.dnf.Dispatch("debug_event", "3 获取列表完成")
  136. qu.Debug("3获取列表完成")
  137. //TODO 3. 打开详情页 ,支持多线程
  138. qu.Debug("开始下载"+fmt.Sprint(i+1)+"页详情数据,共", len(listResult), "条")
  139. for _, v := range listResult {
  140. select {
  141. case <-exit:
  142. return
  143. default:
  144. w := <-ch
  145. wg.Add(1)
  146. no += 1
  147. v.No = no
  148. v.Site = sc.Site
  149. v.Channel = sc.Channel
  150. currentResult.PushBack(v)
  151. go w.Run(v, ch, wg)
  152. }
  153. }
  154. wg.Wait()
  155. vm.dnf.Dispatch("debug_event", "4 当前页采集完成,准备执行翻页逻辑")
  156. //翻页
  157. if err = trunPage(sc, trunPageDelay, ctx); err != nil {
  158. qu.Debug("翻页失败", err.Error())
  159. vm.dnf.Dispatch("debug_event", "6 翻页失败: "+err.Error())
  160. time.Sleep(3 * time.Second)
  161. break
  162. }
  163. }
  164. vm.dnf.Dispatch("debug_event", "6 采集测试完成")
  165. qu.Debug("6 采集测试完成")
  166. }