jobs.go 7.2 KB


  1. /**
  2. *批量作业任务调度
  3. */
  4. package vm
  5. import (
  6. "container/list"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "log"
  11. be "spider_creator/backend"
  12. bdb "spider_creator/backend/db"
  13. "strconv"
  14. "time"
  15. "github.com/xuri/excelize/v2"
  16. "github.com/chromedp/chromedp"
  17. )
  18. var (
  19. // 调度中心,本地运行,只加不减,
  20. // 同时运行的作业没几个,这里不删除元素,不加锁
  21. runningJobs = map[string]*be.JobRunningState{}
  22. )
  23. // 异步执行指定job,只支持单线程
  24. func (vm *VM) RunJob(code string) {
  25. // TODO 1 加载配置
  26. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "加载作业"})
  27. job, err := bdb.LoadEntity[be.Job]("jobs", code)
  28. var state *be.JobRunningState
  29. if err != nil {
  30. log.Println(err.Error())
  31. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "执行作业失败:" + err.Error()})
  32. return
  33. }
  34. if v, ok := runningJobs[code]; ok {
  35. v.Progress = 1
  36. v.State = 1
  37. v.ExitCh = make(chan bool, 1)
  38. v.ResultCache.Init()
  39. state = v
  40. } else {
  41. state = &be.JobRunningState{
  42. Code: code,
  43. State: 1,
  44. Progress: 1,
  45. ResultCache: new(list.List),
  46. ExitCh: make(chan bool, 1),
  47. }
  48. runningJobs[code] = state
  49. }
  50. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "加载作业完成"})
  51. no := 1
  52. //加载参数
  53. _, baseCancelFn, _, _, ctx, incCancelFn := be.NewBrowser(true, false, "") //列表页使用
  54. _, baseCancelFn2, _, _, ctx2, incCancelFn2 := be.NewBrowser(true, false, "") //详情页使用
  55. defer func() {
  56. job.State = 0
  57. job.Progress = 0
  58. incCancelFn2()
  59. baseCancelFn2()
  60. incCancelFn()
  61. baseCancelFn()
  62. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code, Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: fmt.Sprintf("作业执行结束,结果:%d", state.ResultCache.Len())})
  63. close(state.ExitCh)
  64. }()
  65. log.Println("共有子爬虫数量:", len(job.Items))
  66. var totalPages, downloadedPages float32 = 0, 0
  67. for _, item := range job.Items {
  68. totalPages += float32(item.MaxPages)
  69. }
  70. //TODO 2. 主体结构
  71. L:
  72. for _, item := range job.Items {
  73. //单个任务爬取
  74. // TODO 加载单个爬虫采集配置
  75. sf, err := bdb.LoadEntity[be.SpiderConfig]("myBucket", item.SpiderCode)
  76. if err != nil {
  77. log.Println("加载爬虫配置参数失败:", err.Error())
  78. continue
  79. }
  80. log.Println(*sf)
  81. log.Println(*item)
  82. listRunJs, contentRunJs := sf.ListJSCode, sf.ContentJSCode
  83. //TODO 2. 执行JS代码,获取列表页信息
  84. if listRunJs == "" {
  85. listRunJs = renderJavascriptCoder(loadListItemsJS, sf)
  86. }
  87. if contentRunJs == "" {
  88. contentRunJs = renderJavascriptCoder(loadContentJS, sf)
  89. }
  90. listResult := make(be.ResultItems, 0)
  91. //TODO 3.打开列表,获取条目清单
  92. chromedp.Run(ctx, chromedp.Tasks{
  93. chromedp.Navigate(item.Url),
  94. chromedp.WaitReady("document.body", chromedp.ByJSPath),
  95. chromedp.Sleep(time.Duration(item.ListDelay) * time.Millisecond),
  96. })
  97. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
  98. Act: be.JOB_RUNNING_EVENT_DEBUG, Msg: "打开列表页完成"})
  99. //TODO 4.支持打开多页
  100. for j := 0; j < item.MaxPages; j++ {
  101. downloadedPages += 1
  102. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
  103. Act: be.JOB_RUNNING_EVENT_PROGRESS,
  104. Progress: int(downloadedPages / totalPages * 100)})
  105. err := chromedp.Run(ctx, chromedp.Tasks{
  106. chromedp.Evaluate(listRunJs, &listResult),
  107. })
  108. if err != nil {
  109. log.Println("执行JS代码失败", err.Error())
  110. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
  111. Act: be.JOB_RUNNING_EVENT_DEBUG,
  112. Msg: "执行列表页JS代码失败"})
  113. continue
  114. }
  115. log.Println("加载当前列表页,长度:", len(listResult))
  116. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
  117. Act: be.JOB_RUNNING_EVENT_DEBUG,
  118. Msg: "获取列表完成"})
  119. //TODO 5.操作详情页
  120. for _, r := range listResult {
  121. log.Println("详情页", r.Title, r.Href)
  122. select {
  123. case <-state.ExitCh:
  124. break L
  125. default:
  126. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
  127. Act: be.JOB_RUNNING_EVENT_DEBUG,
  128. Msg: fmt.Sprintf("打开详情页%d %s", no, r.ListTitle)})
  129. //打开详情页
  130. err = chromedp.Run(ctx2, chromedp.Tasks{
  131. chromedp.Navigate(r.Href),
  132. chromedp.WaitReady("document.body", chromedp.ByJSPath),
  133. chromedp.Sleep(time.Duration(item.ContentDelay) * time.Millisecond),
  134. })
  135. if err != nil {
  136. continue
  137. }
  138. //获取详情页内容
  139. err = chromedp.Run(ctx2, chromedp.Tasks{
  140. chromedp.Evaluate(contentRunJs, r),
  141. })
  142. if err != nil {
  143. continue
  144. }
  145. if item.NeedDownloadAttaches {
  146. downloadAttaches(r, vm.attachesDir)
  147. }
  148. //补齐其他字段
  149. r.No = no
  150. no += 1
  151. r.Site = item.SpiderSite
  152. r.Channel = item.Channel
  153. //结果放入缓存
  154. state.ResultCache.PushBack(r)
  155. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
  156. Act: be.JOB_RUNNING_EVENT_DEBUG,
  157. Msg: fmt.Sprintf("下载详情页%d %s", no, r.Href)})
  158. }
  159. }
  160. //TODO 6.翻页
  161. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
  162. Act: be.JOB_RUNNING_EVENT_DEBUG,
  163. Msg: "准备翻页"})
  164. if err = trunPage(sf, item.TrunPageDelay, ctx); err != nil {
  165. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
  166. Act: be.JOB_RUNNING_EVENT_DEBUG,
  167. Msg: "执行翻页代码失败"})
  168. break
  169. }
  170. }
  171. }
  172. vm.dnf.Dispatch("run_job_event", &be.JobRunningEvent{Code: job.Code,
  173. Act: be.JOB_RUNNING_EVENT_PROGRESS,
  174. Progress: 100})
  175. }
  176. // StopJob
  177. func (vm *VM) StopJob(code string) {
  178. defer func() {
  179. if err := recover(); err != nil {
  180. log.Println(err)
  181. }
  182. }()
  183. if v, ok := runningJobs[code]; ok {
  184. v.ExitCh <- true
  185. v.State = 0
  186. v.Progress = 0
  187. }
  188. }
  189. // ExportJobResult
  190. func (vm *VM) ExportJobResult(code, filePath string) error {
  191. if job, ok := runningJobs[code]; ok {
  192. f := excelize.NewFile()
  193. defer f.Close()
  194. f.SetCellStr("Sheet1", "A1", "站点")
  195. f.SetCellStr("Sheet1", "B1", "栏目")
  196. //写入数据
  197. f.SetCellStr("Sheet1", "C1", "标题")
  198. f.SetCellStr("Sheet1", "D1", "链接")
  199. f.SetCellStr("Sheet1", "E1", "发布单位")
  200. f.SetCellStr("Sheet1", "F1", "发布时间")
  201. f.SetCellStr("Sheet1", "G1", "正文")
  202. f.SetCellStr("Sheet1", "H1", "附件")
  203. i := 0
  204. for el := job.ResultCache.Front(); el != nil; el = el.Next() {
  205. r, _ := el.Value.(*be.ResultItem)
  206. //写入站点信息
  207. iStr := strconv.Itoa(i + 2)
  208. f.SetCellStr("Sheet1", "A"+iStr, r.Site)
  209. f.SetCellStr("Sheet1", "B"+iStr, r.Channel)
  210. //写入数据
  211. f.SetCellStr("Sheet1", "C"+iStr, r.Title)
  212. f.SetCellStr("Sheet1", "D"+iStr, r.Href)
  213. f.SetCellStr("Sheet1", "E"+iStr, r.PublishUnit)
  214. f.SetCellStr("Sheet1", "F"+iStr, r.ListPubTime)
  215. f.SetCellStr("Sheet1", "G"+iStr, r.Content)
  216. f.SetCellStr("Sheet1", "H"+iStr, "")
  217. if len(r.AttachLinks) > 0 {
  218. bs, err := json.Marshal(r.AttachLinks)
  219. if err == nil {
  220. f.SetCellStr("Sheet1", "H"+iStr, string(bs))
  221. }
  222. }
  223. i += 1
  224. }
  225. err := f.SaveAs(filePath)
  226. if err != nil {
  227. return err
  228. }
  229. return nil
  230. }
  231. return errors.New("找不到正在该作业")
  232. }