excavate.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import re
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.log import logger
  4. from crawler.Task import Task
  5. from crawler.analysis import TimeExtractor
  6. from crawler.services.basics import BasicService
  7. from crawler.utils import (
  8. extract_host,
  9. extract_page_title,
  10. extract_domain,
  11. err_details,
  12. is_url,
  13. html2element,
  14. iter_node,
  15. check_page_by_words
  16. )
  17. TLDS = ['com', 'cn', 'net', 'org']
  18. URL_SUFFIX = ['pdf', 'xls', 'xlsx', 'docx', 'doc', 'rar', 'zip']
  19. URL_SUFFIX_PATTERN = '.*(' + '|'.join(URL_SUFFIX) + ')$'
  20. class DataExcavate(BasicService):
  21. """数据挖掘服务"""
  22. def __init__(self, **kwargs):
  23. self._workers = (kwargs.pop('workers', None) or 1)
  24. self._max_depth = (kwargs.pop('excavate_depth', None) or 1)
  25. self._interval = (kwargs.pop('excavate_interval', None) or 60)
  26. super(DataExcavate, self).__init__(**kwargs)
  27. self._default_depth = 1
  28. def _init_depth(self, task: Task):
  29. if task.get('depth') is None:
  30. task.update({'depth': self._default_depth})
  31. def interceptor(self, task: Task):
  32. """
  33. 拦截器:拦截需要丢弃的任务
  34. :param task: 请求任务
  35. :return:
  36. """
  37. url = task['url']
  38. if not is_url(url):
  39. logger.debug(f'<{self.thread_name}> - 垃圾网址 - {url}')
  40. return True
  41. curr_depth = task['depth']
  42. domain = extract_domain(url)
  43. is_attachment_url = re.match(URL_SUFFIX_PATTERN, url) is not None
  44. if any([
  45. curr_depth > self._max_depth, # 检查任务层级
  46. is_attachment_url, # 检查网址是否附件下载地址
  47. self.validator.data(url), # 垃圾池 - 判重任务请求网址
  48. self.validator.data(domain), # 垃圾池 - 过滤域名
  49. self.collector.data(domain), # 收录池 - 判重域名
  50. ]):
  51. logger.debug(f'<{self.thread_name}> - 无效任务 - {curr_depth} - {url}')
  52. return True
  53. return False
  54. def filter_data(self, lst):
  55. """通过垃圾过滤器过滤数据"""
  56. results = []
  57. for val in lst:
  58. if not self.validator.data(val):
  59. results.append(val)
  60. return results
  61. def same_origin_strategy(self, source, task: Task):
  62. """同源策略"""
  63. # 排查时间文本
  64. hit_date_total = 0
  65. date_nodes = []
  66. element = html2element(source)
  67. for node, _ in iter_node(element):
  68. pt = TimeExtractor().extractor(node)
  69. if pt and not node.getchildren() and node not in date_nodes:
  70. date_nodes.append(node)
  71. hit_date_total += 1
  72. # 全文排查检索词
  73. hit_text_total = 0
  74. all_text = ["".join(text.split()) for text in element.itertext()]
  75. all_text = [text for text in all_text if len(text) > 0]
  76. for text in all_text:
  77. # print(text)
  78. if check_page_by_words(text):
  79. hit_text_total += 1
  80. # 寻源结果
  81. if all([3 < hit_date_total < 50, hit_text_total > 3]):
  82. self.push_domain(task)
  83. elif hit_text_total > 5:
  84. self.push_domain(task)
  85. else:
  86. self.push_remove(task)
  87. # 抽取当前页面同源链接(使用同源模式:mode=1),不遗漏挖掘层级,同时权重优先级+1,优先处理
  88. lst = []
  89. history = []
  90. sub_depth = task['depth'] + 1
  91. sub_weight = task['weight'] + 1
  92. items = self.parser.turls(source, task['url'], mode=1)
  93. for item in items:
  94. name, url = item['title'], item['href']
  95. if not self.validator.data(url) and url not in history:
  96. lst.append(self.make_task(
  97. url=url,
  98. name=name,
  99. depth=sub_depth,
  100. origin=task['origin'],
  101. groups=task['groups'],
  102. classify=self.visit_classify,
  103. weight=sub_weight
  104. ))
  105. history.append(url)
  106. self.scheduler.add_excavate(lst, level=sub_weight)
  107. def non_origin_strategy(self, source, task: Task):
  108. """非同源策略"""
  109. lst = []
  110. history = []
  111. # 抽取当前页面非同源链接(使用非同源模式:mode=2),添加挖掘队列,一个站点挖掘结束,开始处理新站点
  112. non_origin_urls = self.parser.urls(source, task['url'], mode=2)
  113. for url in non_origin_urls:
  114. if not self.validator.data(url) and url not in history:
  115. lst.append(self.make_task(
  116. url=url,
  117. origin=task['origin'],
  118. groups=task['groups'],
  119. classify=self.visit_classify,
  120. weight=self.url_weight
  121. ))
  122. history.append(url)
  123. self.scheduler.add_excavate(lst, level=self.url_weight)
  124. def fetch_page(self, task: Task):
  125. url = task['url']
  126. response = self.downloader.get(url, timeout=5)
  127. status_code = response.status_code
  128. page_source = response.text
  129. reason = response.reason
  130. if status_code != 200 or page_source in ['', None]:
  131. task['err_reason'] = reason
  132. msg = f'<{self.thread_name}> - {url} - {status_code} - {reason}'
  133. logger.error(msg)
  134. return status_code, None
  135. return status_code, page_source
  136. def process(self, task: Task):
  137. logger.info(f'<{self.thread_name}> - 请求 - {task["url"]}')
  138. status_code, page_source = self.fetch_page(task)
  139. task['status_code'] = status_code
  140. if page_source is None:
  141. # 访问失败的域名是否添加过滤器?
  142. self.push_remove(task)
  143. return False
  144. task['domain'] = extract_domain(task['url'])
  145. task['base_url'] = extract_host(task['url'])
  146. task['name'] = extract_page_title(page_source)
  147. self.same_origin_strategy(page_source, task)
  148. self.non_origin_strategy(page_source, task)
  149. return True
  150. def excavate(self):
  151. logger.info(f'开启线程 - <{self.thread_name}>')
  152. while True:
  153. tasks = self.scheduler.get_excavate_task()
  154. if len(tasks) == 0:
  155. self.loops_interval(self._interval, enable_debug_log=True)
  156. continue
  157. task_key, task = tasks
  158. self._init_depth(task) # 初始化网站层级
  159. if not self.interceptor(task):
  160. try:
  161. self.process(task)
  162. except Exception as e:
  163. logger.exception(e)
  164. # '''挖掘记录'''
  165. # self.push_records(task)
  166. def start(self):
  167. with ThreadPoolExecutor(self._workers, 'dataExcavate') as executor:
  168. futures = []
  169. for _ in range(1, self._workers + 1):
  170. f = executor.submit(self.excavate)
  171. f.add_done_callback(err_details)
  172. futures.append(f)
  173. wait(futures)