worker.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package vm
  2. import (
  3. "container/list"
  4. "fmt"
  5. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. be "spider_creator/backend"
  7. "sync"
  8. "time"
  9. "github.com/chromedp/chromedp"
  10. )
  11. // 销毁
  12. func (w *Worker) Destory() {
  13. if w.incCancel != nil {
  14. w.incCancel()
  15. }
  16. if w.baseCancel != nil {
  17. w.baseCancel()
  18. }
  19. }
  20. // NewWorker
  21. func NewWorker(headless bool, showImage bool, proxyServe string, contentDelay int64, js string, vm *VM) *Worker {
  22. _, baseCancel, _, _, ctx, cancel := be.NewBrowser(headless, showImage, proxyServe)
  23. return &Worker{
  24. baseCancel: baseCancel,
  25. incCancel: cancel,
  26. ctx: ctx,
  27. js: js,
  28. contentDelay: contentDelay,
  29. vm: vm,
  30. }
  31. }
  32. // 执行作业
  33. func (w *Worker) Run(v *be.ResultItem, ch chan *Worker, wg *sync.WaitGroup) {
  34. defer func() {
  35. ch <- w
  36. wg.Done()
  37. }()
  38. w.vm.dnf.Dispatch("debug_event", fmt.Sprintf("4. %d- 待 下载详情页 %s ", v.No, v.ListTitle))
  39. var result string = ""
  40. err := chromedp.Run(w.ctx, chromedp.Tasks{
  41. chromedp.Navigate(v.Href),
  42. chromedp.WaitReady(`document.body`, chromedp.ByJSPath),
  43. chromedp.Sleep(time.Duration(w.contentDelay) * time.Millisecond),
  44. chromedp.Evaluate(w.js, v),
  45. })
  46. if err != nil {
  47. qu.Debug("执行JS代码失败_详情", err.Error())
  48. }
  49. if len(v.AttachLinks) > 0 { //有附件
  50. w.vm.dnf.Dispatch("debug_event", fmt.Sprintf("4. 下载附件"))
  51. //TODO 下载附件
  52. downloadAttaches(v, w.vm.attachesDir)
  53. }
  54. //关闭当前TAB页
  55. chromedp.Run(w.ctx, chromedp.Tasks{
  56. chromedp.Evaluate(`var ret="";window.close();ret`, &result),
  57. })
  58. w.vm.dnf.Dispatch("debug_event", fmt.Sprintf("4. %d- 下载详情页 %s 完成", v.No, v.Title))
  59. }
  60. // RunSpiderMulThreads
  61. func (vm *VM) RunSpiderMulThreads(url string, maxPages int, listDealy int64, trunPageDelay int64, contentDelay int64, headless bool, showImage bool, proxyServe string, threads int, exit chan bool,
  62. cssMark map[string]interface{}) {
  63. //sc := be.MergeSpiderConfig(currentSpiderConfig, &be.SpiderConfig{Href: url})
  64. sc, err := be.NewSpiderConfig(cssMark)
  65. if err != nil {
  66. qu.Debug("标注信息传输失败!")
  67. vm.dnf.Dispatch("debug_event", "标注信息传输失败!")
  68. return
  69. }
  70. if url != "" {
  71. sc.Href = url
  72. }
  73. _, baseCancel, _, _, ctx, cancel := be.NewBrowser(headless, showImage, proxyServe)
  74. qu.Debug("1浏览器打开")
  75. vm.dnf.Dispatch("debug_event", "1 浏览器打开")
  76. defer func() {
  77. cancel()
  78. baseCancel()
  79. qu.Debug("0浏览器已经销毁")
  80. vm.dnf.Dispatch("debug_event", "0 浏览器已经销毁")
  81. close(exit)
  82. }()
  83. var runListJs, runContentJs string = sc.ListJSCode, sc.ContentJSCode
  84. if be.RegSpace.ReplaceAllString(runListJs, "") == "" {
  85. runListJs = renderJavascriptCoder(loadListItemsJS, sc)
  86. }
  87. if be.RegSpace.ReplaceAllString(runContentJs, "") == "" {
  88. runContentJs = renderJavascriptCoder(loadContentJS, sc)
  89. }
  90. qu.Debug("获取列表页JS代码:", runListJs)
  91. qu.Debug("获取详情页JS代码:", runContentJs)
  92. wts := make([]*Worker, threads)
  93. ch := make(chan *Worker, threads)
  94. wg := new(sync.WaitGroup)
  95. for i := 0; i < threads; i++ {
  96. w := NewWorker(headless, showImage, proxyServe, contentDelay, runContentJs, vm)
  97. wts = append(wts, w)
  98. ch <- w
  99. }
  100. //批量销毁
  101. defer func() {
  102. for _, w := range wts {
  103. if w != nil {
  104. w.Destory()
  105. }
  106. }
  107. }()
  108. no := 0
  109. //TODO 1.翻页操作,需要在外层打开列表页
  110. chromedp.Run(ctx, chromedp.Tasks{
  111. chromedp.Navigate(sc.Href),
  112. chromedp.WaitReady("document.body", chromedp.ByJSPath),
  113. chromedp.Sleep(time.Duration(listDealy) * time.Millisecond),
  114. })
  115. vm.dnf.Dispatch("debug_event", "2 页面已经打开")
  116. qu.Debug("2页面打开")
  117. currentResult := list.New()
  118. be.DataResults[sc.Code] = currentResult
  119. for i := 0; i < maxPages; i++ {
  120. listResult := make(be.ResultItems, 0)
  121. //TODO 2. 执行JS代码,获取列表页信息
  122. err := chromedp.Run(ctx, chromedp.Tasks{
  123. chromedp.Evaluate(runListJs, &listResult),
  124. })
  125. if err != nil {
  126. qu.Debug("执行JS代码失败_列表", err.Error())
  127. vm.dnf.Dispatch("debug_event", "2 列表-执行JS代码失败")
  128. return
  129. }
  130. vm.dnf.Dispatch("debug_event", "3 获取列表完成")
  131. qu.Debug("3获取列表完成")
  132. //TODO 3. 打开详情页 ,支持多线程
  133. qu.Debug("开始下载"+fmt.Sprint(i+1)+"页详情数据,共", len(listResult), "条")
  134. for _, v := range listResult {
  135. select {
  136. case <-exit:
  137. return
  138. default:
  139. w := <-ch
  140. wg.Add(1)
  141. no += 1
  142. v.No = no
  143. v.Site = sc.Site
  144. v.Channel = sc.Channel
  145. currentResult.PushBack(v)
  146. go w.Run(v, ch, wg)
  147. }
  148. }
  149. wg.Wait()
  150. vm.dnf.Dispatch("debug_event", "4 当前页采集完成,准备执行翻页逻辑")
  151. //翻页
  152. if err = trunPage(sc, trunPageDelay, ctx); err != nil {
  153. qu.Debug("翻页失败", err.Error())
  154. vm.dnf.Dispatch("debug_event", "6 翻页失败: "+err.Error())
  155. time.Sleep(3 * time.Second)
  156. break
  157. }
  158. }
  159. vm.dnf.Dispatch("debug_event", "6 采集测试完成")
  160. qu.Debug("6 采集测试完成")
  161. }