excavate.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. import threading
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from typing import List
  4. from common.log import logger
  5. from crawler.Task import Task
  6. from crawler.services.basics import BasicSearch
  7. from crawler.utils import (
  8. extract_base_url,
  9. extract_page_title,
  10. extract_domain,
  11. split_domain,
  12. err_details,
  13. )
  14. TLDS = ['com', 'cn', 'net', 'org']
  15. class DataExcavate(BasicSearch):
  16. def __init__(self, workers=1, loop_interval=60, **kwargs):
  17. super(DataExcavate, self).__init__(**kwargs)
  18. self._interval = loop_interval
  19. self._workers = workers
  20. self._max_depth = (kwargs.pop('excavate_depth', None) or 3)
  21. self._default_depth = 1
  22. def _init_depth(self, task: Task):
  23. if task.get('depth') is None:
  24. task.update({'depth': self._default_depth})
  25. def check_depth(self, task: Task, sub_tasks: List[Task]):
  26. _results = []
  27. curr_depth = task['depth']
  28. if curr_depth < self._max_depth:
  29. for t in sub_tasks:
  30. t.setdefault('depth', curr_depth + 1)
  31. _results.append(t)
  32. return _results
  33. def is_rubbish(self, url: str):
  34. """
  35. 网址过滤器
  36. :param url: 网址
  37. :return: bool
  38. """
  39. if self.validator.data(url):
  40. return True
  41. domain = extract_domain(url)
  42. if self.validator.data(domain):
  43. return True
  44. if domain.startswith('www.'):
  45. domain = domain.replace('www.', '')
  46. '''域名处理'''
  47. domain_lst = split_domain(domain)
  48. domain_lst = [d for d in domain_lst if d not in TLDS]
  49. text = ".".join(domain_lst)
  50. if self.validator.data(text):
  51. return True
  52. '''检查域名中所包含的字符串'''
  53. for val in domain_lst:
  54. if self.validator.data(val):
  55. return True
  56. return False
  57. def process(self, task: Task):
  58. t_name = threading.currentThread().getName()
  59. logger.info(f'<{t_name}> - 请求 - {task["url"]}')
  60. response = self.downloader.get(task['url'])
  61. status_code = response.status_code
  62. page_source = response.text
  63. reason = response.reason
  64. task['status_code'] = status_code
  65. if status_code != 200 or page_source in ['', None]:
  66. task['err_reason'] = reason
  67. logger.error(f'<{t_name}> - {reason} - {status_code} - {task["url"]}')
  68. return False
  69. task['domain'] = extract_domain(task['url'])
  70. task['name'] = extract_page_title(page_source)
  71. task['base_url'] = extract_base_url(task['url'])
  72. items = self.parser.site_items(page_source, task['base_url'])
  73. lst = []
  74. _c = 0 # 过滤词计数器
  75. for item in items:
  76. name, url = item['name'], item['host']
  77. if self.validator.words(name):
  78. lst.append(self.make_task(
  79. url=url,
  80. name=name,
  81. origin=task['origin'],
  82. groups=task['groups'],
  83. classify=self.visit_classify,
  84. weight=task['weight']
  85. ))
  86. _c += 1
  87. if _c > 1:
  88. if self.push_domain(task):
  89. lst = self.check_depth(task, lst)
  90. else:
  91. self.push_remove(task)
  92. self.scheduler.add_excavate(lst, level=task['weight'])
  93. return True
  94. def excavate(self):
  95. t_name = threading.currentThread().getName()
  96. logger.info(f'开启线程 - <{t_name}>')
  97. while True:
  98. tasks = self.scheduler.get_excavate_task()
  99. if len(tasks) == 0:
  100. self.loops_interval(self._interval)
  101. continue
  102. task_key, task = tasks
  103. '''初始化网站层级'''
  104. self._init_depth(task)
  105. if self.is_rubbish(task['url']):
  106. logger.debug(f'<{t_name}> - 去重网址 - {task["url"]}')
  107. continue
  108. '''数据挖掘'''
  109. try:
  110. success = self.process(task)
  111. if not success:
  112. '''url - 添加过滤器'''
  113. self.validator.add_data(task['url'])
  114. except Exception as e:
  115. logger.exception(e)
  116. # '''挖掘记录'''
  117. # self.push_records(task)
  118. def start(self):
  119. with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
  120. futures = []
  121. for _ in range(1, self._workers + 1):
  122. f = executor.submit(self.excavate)
  123. f.add_done_callback(err_details)
  124. futures.append(f)
  125. wait(futures)