excavate.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. import re
  2. import threading
  3. from concurrent.futures import ThreadPoolExecutor, wait
  4. from typing import List
  5. from common.log import logger
  6. from crawler.Task import Task
  7. from crawler.services.basics import BasicService
  8. from crawler.utils import (
  9. extract_base_url,
  10. extract_page_title,
  11. extract_domain,
  12. split_domain,
  13. err_details,
  14. is_url,
  15. )
  16. TLDS = ['com', 'cn', 'net', 'org']
  17. URL_SUFFIX = ['pdf', 'xls', 'xlsx', 'docx', 'doc', 'rar', 'zip']
  18. URL_SUFFIX_PATTERN = '.*(' + '|'.join(URL_SUFFIX) + ')$'
  19. class DataExcavate(BasicService):
  20. """数据挖掘服务"""
  21. def __init__(self, **kwargs):
  22. self._workers = (kwargs.pop('workers', None) or 1)
  23. self._max_depth = (kwargs.pop('excavate_depth', None) or 1)
  24. self._interval = (kwargs.pop('excavate_interval', None) or 60)
  25. super(DataExcavate, self).__init__(**kwargs)
  26. self._default_depth = 1
  27. def _init_depth(self, task: Task):
  28. if task.get('depth') is None:
  29. task.update({'depth': self._default_depth})
  30. def check_depth(self, task: Task, sub_tasks: List[Task]):
  31. _results = []
  32. curr_depth = task['depth']
  33. if curr_depth < self._max_depth:
  34. for t in sub_tasks:
  35. t.setdefault('depth', curr_depth + 1)
  36. _results.append(t)
  37. return _results
  38. def is_rubbish(self, url: str):
  39. """
  40. 网址过滤器
  41. :param url: 网址
  42. :return: False = 不存在, True = 存在
  43. """
  44. if not is_url(url):
  45. return True
  46. if self.validator.data(url):
  47. return True
  48. domain = extract_domain(url)
  49. if self.validator.data(domain):
  50. return True
  51. if domain.startswith('www.'):
  52. domain = domain.replace('www.', '')
  53. '''域名处理'''
  54. domain_lst = split_domain(domain)
  55. domain_lst = [d for d in domain_lst if d not in TLDS]
  56. text = ".".join(domain_lst)
  57. if self.validator.data(text):
  58. return True
  59. '''检查域名中所包含的字符串'''
  60. for val in domain_lst:
  61. if self.validator.data(val):
  62. return True
  63. return False
  64. def process(self, t_name: str, task: Task):
  65. logger.info(f'<{t_name}> - 请求 - {task["url"]}')
  66. response = self.downloader.get(task['url'], disable_debug_log=False)
  67. status_code = response.status_code
  68. page_source = response.text
  69. reason = response.reason
  70. task['status_code'] = status_code
  71. if status_code != 200 or page_source in ['', None]:
  72. task['err_reason'] = reason
  73. logger.error(f'<{t_name}> - {reason} - {status_code} - {task["url"]}')
  74. return False
  75. task['domain'] = extract_domain(task['url'])
  76. task['name'] = extract_page_title(page_source)
  77. task['base_url'] = extract_base_url(task['url'])
  78. items = self.parser.site_items(page_source, task['base_url'])
  79. lst = []
  80. _c = 0 # 过滤词计数器
  81. sub_depth = task['depth'] + 1
  82. sub_weight = task['weight'] + 1
  83. for item in items:
  84. name, url = item['name'], item['host']
  85. if self.validator.words(name):
  86. lst.append(self.make_task(
  87. url=url,
  88. name=name,
  89. depth=sub_depth,
  90. origin=task['origin'],
  91. groups=task['groups'],
  92. classify=self.visit_classify,
  93. weight=sub_weight
  94. ))
  95. _c += 1
  96. if _c > 1:
  97. save = self.push_domain(task)
  98. else:
  99. save = self.push_remove(task)
  100. msg = f'<{t_name}> - 新增网址 - {task["url"]}'
  101. if not save:
  102. msg = f'<{t_name}> - 重复网址 - {task["url"]}'
  103. logger.debug(msg)
  104. '''层级深的,优先采集'''
  105. self.scheduler.add_excavate(lst, level=sub_weight)
  106. return True
  107. def excavate(self):
  108. t_name = threading.currentThread().getName()
  109. logger.info(f'开启线程 - <{t_name}>')
  110. while True:
  111. tasks = self.scheduler.get_excavate_task()
  112. if len(tasks) == 0:
  113. self.loops_interval(self._interval)
  114. continue
  115. task_key, task = tasks
  116. '''初始化网站层级'''
  117. self._init_depth(task)
  118. if self.is_rubbish(task['url']):
  119. logger.debug(f'<{t_name}> - 垃圾数据 - {task["url"]}')
  120. continue
  121. '''层级控制'''
  122. if task['depth'] > self._max_depth:
  123. logger.debug(f'<{t_name}> - 层级超限 - {task["url"]}')
  124. self.push_records(task)
  125. continue
  126. dont_visit = re.match(URL_SUFFIX_PATTERN, task['url']) is not None
  127. if not dont_visit:
  128. try:
  129. success = self.process(t_name, task)
  130. if not success:
  131. self.validator.add_data(task['url'])
  132. except Exception as e:
  133. logger.exception(e)
  134. # '''挖掘记录'''
  135. # self.push_records(task)
  136. def start(self):
  137. with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
  138. futures = []
  139. for _ in range(1, self._workers + 1):
  140. f = executor.submit(self.excavate)
  141. f.add_done_callback(err_details)
  142. futures.append(f)
  143. wait(futures)