data_excavate.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import threading
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.log import logger
  4. from crawler.Task import Task
  5. from crawler.services.basics import BasicSearch
  6. from crawler.utils import (
  7. extract_base_url,
  8. extract_page_title,
  9. extract_domain,
  10. err_details,
  11. )
  12. class DataExcavate(BasicSearch):
  13. def __init__(self, workers=1, loop_interval=60, **kwargs):
  14. super(DataExcavate, self).__init__(**kwargs)
  15. self._interval = loop_interval
  16. self._workers = workers
  17. def retrieve_site(self, task: Task):
  18. logger.info(f'[数据挖掘]开始请求 - {task["url"]}')
  19. response = self.downloader.get(task['url'])
  20. task['status_code'] = response.status_code
  21. if response.status_code != 200 or response.text in ['', None]:
  22. task['err_reason'] = response.reason
  23. logger.error(f'[数据挖掘]异常网址 - {task["url"]} : {response.reason}')
  24. return False
  25. task['domain'] = extract_domain(task['url'])
  26. page_source = response.text
  27. task['name'] = extract_page_title(page_source)
  28. task['base_url'] = extract_base_url(task['url'])
  29. items = self.parser.site_items(page_source, task['base_url'])
  30. lst = []
  31. _c = 0 # 过滤词计数器
  32. for item in items:
  33. name, url = item['name'], item['host']
  34. if self.validator.words(name):
  35. lst.append(self.make_task(
  36. url=url,
  37. name=name,
  38. origin=task['origin'],
  39. groups=task['groups'],
  40. classify=self.visit_classify,
  41. weight=task['weight']
  42. ))
  43. _c += 1
  44. if _c > 1:
  45. self.push_domain(task)
  46. else:
  47. self.push_remove(task)
  48. self.scheduler.add_excavate(lst, level=task['weight'])
  49. return True
  50. def excavate(self):
  51. t_name = threading.currentThread().getName()
  52. logger.info(f'[数据挖掘]启动 - {t_name}')
  53. while True:
  54. tasks = self.scheduler.get_excavate_task()
  55. if len(tasks) == 0:
  56. self.loops_interval(self._interval)
  57. continue
  58. task_key, task = tasks
  59. if self.validator.data(task['url']):
  60. continue
  61. '''挖掘站点'''
  62. success = self.retrieve_site(task)
  63. if not success:
  64. '''url - 添加过滤器'''
  65. self.validator.add_data(task['url'])
  66. # '''挖掘记录'''
  67. # self.push_records(task)
  68. def start(self):
  69. logger.info(f'[数据挖掘]初始化加载')
  70. with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
  71. futures = []
  72. for _ in range(1, self._workers + 1):
  73. f = executor.submit(self.excavate)
  74. f.add_done_callback(err_details)
  75. futures.append(f)
  76. wait(futures)