data_excavate.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import threading
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.log import logger
  4. from crawler.Task import Task
  5. from crawler.services.basics import BasicSearch
  6. from crawler.utils import (
  7. extract_base_url,
  8. extract_page_title,
  9. extract_domain,
  10. err_details,
  11. )
  12. class DataExcavate(BasicSearch):
  13. def __init__(self, excavate_workers=1, loop_excavate_interval=60, **kwargs):
  14. super(DataExcavate, self).__init__(**kwargs)
  15. self._interval = loop_excavate_interval
  16. self._workers = excavate_workers
  17. def retrieve_site(self, task: Task):
  18. logger.info(f'[数据挖掘]开始请求 - {task["url"]}')
  19. response = self.downloader.get(task['url'])
  20. task['status_code'] = response.status_code
  21. if response.status_code != 200 or response.text in ['', None]:
  22. task['err_reason'] = response.reason
  23. logger.error(f'[数据挖掘]异常网址 - {task["url"]}')
  24. return
  25. task['domain'] = extract_domain(task['url'])
  26. page_source = response.text
  27. task['name'] = extract_page_title(page_source)
  28. task['base_url'] = extract_base_url(task['url'])
  29. items = self.parser.site_items(page_source, task['base_url'])
  30. lst = []
  31. _c = 0 # 页面包含的关键词计数器
  32. for item in items:
  33. name, url = item['name'], item['host']
  34. if self.validator.phrase(name):
  35. lst.append(self.make_task(
  36. url=url,
  37. name=name,
  38. origin=task['origin'],
  39. groups=task['groups'],
  40. classify=self.visit_classify,
  41. weight=task['weight']
  42. ))
  43. _c += 1
  44. if _c > 1:
  45. self.push_domain(task)
  46. else:
  47. if not self.validator.data(task['domain']):
  48. self.push_remove(task)
  49. self.scheduler.add_excavate(lst, level=task['weight'])
  50. def excavate(self):
  51. t_name = threading.currentThread().getName()
  52. logger.info(f'[数据挖掘]启动 - {t_name}')
  53. while True:
  54. tasks = self.scheduler.get_excavate_task()
  55. if len(tasks) == 0:
  56. self.loops_interval(self._interval)
  57. continue
  58. task_key, task = tasks
  59. if self.validator.data(task['url']):
  60. continue
  61. self.retrieve_site(task)
  62. '''url - 添加过滤器'''
  63. self.validator.add_data(task['url'])
  64. '''domain - 添加过滤器'''
  65. self.validator.add_data(task['domain'])
  66. '''访问记录'''
  67. self.push_records(task)
  68. def start(self):
  69. logger.info(f'[数据挖掘]初始化加载')
  70. with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
  71. futures = []
  72. for _ in range(1, self._workers + 1):
  73. f = executor.submit(self.excavate)
  74. f.add_done_callback(err_details)
  75. futures.append(f)
  76. wait(futures)