worker.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package vm
  2. import (
  3. "container/list"
  4. "fmt"
  5. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. be "spider_creator/backend"
  7. "sync"
  8. "time"
  9. "github.com/chromedp/chromedp"
  10. )
  11. // 销毁
  12. func (w *Worker) Destory() {
  13. if w.incCancel != nil {
  14. w.incCancel()
  15. }
  16. if w.baseCancel != nil {
  17. w.baseCancel()
  18. }
  19. }
  20. // NewWorker
  21. func NewWorker(headless bool, showImage bool, proxyServe string, contentDelay int64, js string, vm *VM) *Worker {
  22. _, baseCancel, _, _, ctx, cancel := be.NewBrowser(headless, showImage, proxyServe)
  23. return &Worker{baseCancel: baseCancel,
  24. incCancel: cancel,
  25. ctx: ctx,
  26. js: js,
  27. contentDelay: contentDelay,
  28. vm: vm,
  29. }
  30. }
  31. // 执行作业
  32. func (w *Worker) Run(v *be.ResultItem, ch chan *Worker, wg *sync.WaitGroup) {
  33. defer func() {
  34. ch <- w
  35. wg.Done()
  36. }()
  37. w.vm.dnf.Dispatch("debug_event", fmt.Sprintf("4. %d- 待 下载详情页 %s ", v.No, v.ListTitle))
  38. var result string = ""
  39. err := chromedp.Run(w.ctx, chromedp.Tasks{
  40. chromedp.Navigate(v.Href),
  41. chromedp.WaitReady(`document.body`, chromedp.ByJSPath),
  42. chromedp.Sleep(time.Duration(w.contentDelay) * time.Millisecond),
  43. chromedp.Evaluate(w.js, v),
  44. })
  45. if err != nil {
  46. qu.Debug("执行JS代码失败_详情", err.Error())
  47. }
  48. if len(v.AttachLinks) > 0 { //有附件
  49. w.vm.dnf.Dispatch("debug_event", fmt.Sprintf("4. 下载附件"))
  50. //TODO 下载附件
  51. downloadAttaches(v, w.vm.attachesDir)
  52. }
  53. //关闭当前TAB页
  54. chromedp.Run(w.ctx, chromedp.Tasks{
  55. chromedp.Evaluate(`var ret="";window.close();ret`, &result),
  56. })
  57. w.vm.dnf.Dispatch("debug_event", fmt.Sprintf("4. %d- 下载详情页 %s 完成", v.No, v.Title))
  58. }
  59. // RunSpiderMulThreads
  60. func (vm *VM) RunSpiderMulThreads(url string, maxPages int, listDealy int64, trunPageDelay int64, contentDelay int64, headless bool, showImage bool, proxyServe string, threads int, exit chan bool,
  61. cssMark map[string]interface{}) {
  62. //sc := be.MergeSpiderConfig(currentSpiderConfig, &be.SpiderConfig{Href: url})
  63. sc, err := be.NewSpiderConfig(cssMark)
  64. if err != nil {
  65. qu.Debug("标注信息传输失败!")
  66. vm.dnf.Dispatch("debug_event", "标注信息传输失败!")
  67. return
  68. }
  69. if url != "" {
  70. sc.Href = url
  71. }
  72. _, baseCancel, _, _, ctx, cancel := be.NewBrowser(headless, showImage, proxyServe)
  73. qu.Debug("1浏览器打开")
  74. vm.dnf.Dispatch("debug_event", "1 浏览器打开")
  75. defer func() {
  76. cancel()
  77. baseCancel()
  78. qu.Debug("0浏览器已经销毁")
  79. vm.dnf.Dispatch("debug_event", "0 浏览器已经销毁")
  80. close(exit)
  81. }()
  82. var runListJs, runContentJs string = sc.ListJSCode, sc.ContentJSCode
  83. if runListJs == "" {
  84. runListJs = renderJavascriptCoder(loadListItemsJS, sc)
  85. }
  86. if runContentJs == "" {
  87. runContentJs = renderJavascriptCoder(loadContentJS, sc)
  88. }
  89. qu.Debug("获取列表JS代码", runListJs)
  90. wts := make([]*Worker, threads)
  91. ch := make(chan *Worker, threads)
  92. wg := new(sync.WaitGroup)
  93. for i := 0; i < threads; i++ {
  94. w := NewWorker(headless, showImage, proxyServe, contentDelay, runContentJs, vm)
  95. wts = append(wts, w)
  96. ch <- w
  97. }
  98. //批量销毁
  99. defer func() {
  100. for _, w := range wts {
  101. if w != nil {
  102. w.Destory()
  103. }
  104. }
  105. }()
  106. no := 1
  107. //TODO 1.翻页操作,需要在外层打开列表页
  108. chromedp.Run(ctx, chromedp.Tasks{
  109. chromedp.Navigate(sc.Href),
  110. chromedp.WaitReady("document.body", chromedp.ByJSPath),
  111. chromedp.Sleep(time.Duration(listDealy) * time.Millisecond),
  112. })
  113. vm.dnf.Dispatch("debug_event", "2 页面已经打开")
  114. qu.Debug("2页面打开")
  115. currentResult := list.New()
  116. be.DataResults[sc.Code] = currentResult
  117. for i := 0; i < maxPages; i++ {
  118. listResult := make(be.ResultItems, 0)
  119. //TODO 2. 执行JS代码,获取列表页信息
  120. err := chromedp.Run(ctx, chromedp.Tasks{
  121. chromedp.Evaluate(runListJs, &listResult),
  122. })
  123. if err != nil {
  124. qu.Debug("执行JS代码失败_列表", err.Error())
  125. vm.dnf.Dispatch("debug_event", "2 列表-执行JS代码失败")
  126. return
  127. }
  128. vm.dnf.Dispatch("debug_event", "3 获取列表完成")
  129. qu.Debug("3获取列表完成")
  130. //TODO 3. 打开详情页 ,支持多线程
  131. for _, v := range listResult {
  132. select {
  133. case <-exit:
  134. return
  135. default:
  136. w := <-ch
  137. wg.Add(1)
  138. no += 1
  139. v.No = no
  140. currentResult.PushBack(v)
  141. go w.Run(v, ch, wg)
  142. }
  143. }
  144. wg.Wait()
  145. vm.dnf.Dispatch("debug_event", "4 当前页采集完成,准备执行翻页逻辑//"+sc.ListNextPageCss)
  146. if err = trunPage(sc, trunPageDelay, ctx); err != nil {
  147. qu.Debug("翻页失败", err.Error())
  148. vm.dnf.Dispatch("debug_event", "6 翻页失败: "+err.Error())
  149. time.Sleep(3 * time.Second)
  150. break
  151. }
  152. }
  153. vm.dnf.Dispatch("debug_event", "6 采集测试完成")
  154. qu.Debug("6 采集测试完成")
  155. }