excavate.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import re
  2. import threading
  3. from concurrent.futures import ThreadPoolExecutor, wait
  4. from typing import List
  5. from common.log import logger
  6. from crawler.Task import Task
  7. from crawler.services.basics import BasicService
  8. from crawler.utils import (
  9. extract_host,
  10. extract_page_title,
  11. extract_domain,
  12. split_domain,
  13. err_details,
  14. is_url,
  15. )
  16. TLDS = ['com', 'cn', 'net', 'org']
  17. URL_SUFFIX = ['pdf', 'xls', 'xlsx', 'docx', 'doc', 'rar', 'zip']
  18. URL_SUFFIX_PATTERN = '.*(' + '|'.join(URL_SUFFIX) + ')$'
  19. class DataExcavate(BasicService):
  20. """数据挖掘服务"""
  21. def __init__(self, **kwargs):
  22. self._workers = (kwargs.pop('workers', None) or 1)
  23. self._max_depth = (kwargs.pop('excavate_depth', None) or 1)
  24. self._interval = (kwargs.pop('excavate_interval', None) or 60)
  25. super(DataExcavate, self).__init__(**kwargs)
  26. self._default_depth = 1
  27. def _init_depth(self, task: Task):
  28. if task.get('depth') is None:
  29. task.update({'depth': self._default_depth})
  30. def check_depth(self, task: Task, sub_tasks: List[Task]):
  31. _results = []
  32. curr_depth = task['depth']
  33. if curr_depth < self._max_depth:
  34. for t in sub_tasks:
  35. t.setdefault('depth', curr_depth + 1)
  36. _results.append(t)
  37. return _results
  38. def is_rubbish(self, url: str):
  39. """
  40. 网址过滤器
  41. :param url: 网址
  42. :return: False = 不存在, True = 存在
  43. """
  44. if not is_url(url):
  45. return True
  46. if self.validator.data(url):
  47. return True
  48. domain = extract_domain(url)
  49. if self.validator.data(domain):
  50. return True
  51. if domain.startswith('www.'):
  52. domain = domain.replace('www.', '')
  53. '''域名处理'''
  54. domain_lst = split_domain(domain)
  55. domain_lst = [d for d in domain_lst if d not in TLDS]
  56. text = ".".join(domain_lst)
  57. if self.validator.data(text):
  58. return True
  59. '''检查域名中所包含的字符串'''
  60. for val in domain_lst:
  61. if self.validator.data(val):
  62. return True
  63. return False
  64. def process(self, t_name: str, task: Task):
  65. logger.info(f'<{t_name}> - 请求 - {task["url"]}')
  66. response = self.downloader.get(task['url'], timeout=5)
  67. status_code = response.status_code
  68. page_source = response.text
  69. reason = response.reason
  70. task['status_code'] = status_code
  71. if status_code != 200 or page_source in ['', None]:
  72. task['err_reason'] = reason
  73. logger.error(f'<{t_name}> - {reason} - {status_code} - {task["url"]}')
  74. return False
  75. task['domain'] = extract_domain(task['url'])
  76. task['name'] = extract_page_title(page_source)
  77. task['base_url'] = extract_host(task['url'])
  78. lst = []
  79. _history = []
  80. _c = 0 # 过滤词计数器
  81. sub_depth = task['depth'] + 1
  82. sub_weight = task['weight'] + 1
  83. items = self.parser.non_origin(page_source, task['url'])
  84. for item in items:
  85. name, url = item['title'], item['href']
  86. if self.validator.words(name):
  87. if url not in _history:
  88. lst.append(self.make_task(
  89. url=url,
  90. name=name,
  91. depth=sub_depth,
  92. origin=task['origin'],
  93. groups=task['groups'],
  94. classify=self.visit_classify,
  95. weight=sub_weight
  96. ))
  97. _history.append(url)
  98. _c += 1
  99. if _c > 1:
  100. save = self.push_domain(task)
  101. msg = f'<{t_name}> - 收录成功 - {task["url"]}'
  102. if not save:
  103. msg = f'<{t_name}> - 重复收录 - {task["url"]}'
  104. else:
  105. remove = self.push_remove(task)
  106. msg = f'<{t_name}> - 过滤丢弃 - {task["url"]}'
  107. if not remove:
  108. msg = f'<{t_name}> - 重复收录 - {task["url"]}'
  109. logger.debug(msg)
  110. '''层级深的,优先采集'''
  111. self.scheduler.add_excavate(lst, level=sub_weight)
  112. return True
  113. def excavate(self):
  114. t_name = threading.currentThread().getName()
  115. logger.info(f'开启线程 - <{t_name}>')
  116. while True:
  117. tasks = self.scheduler.get_excavate_task()
  118. if len(tasks) == 0:
  119. self.loops_interval(self._interval, enable_debug_log=True)
  120. continue
  121. task_key, task = tasks
  122. # 初始化网站层级
  123. self._init_depth(task)
  124. if self.is_rubbish(task['url']):
  125. logger.debug(f'<{t_name}> - 垃圾数据 - {task["url"]}')
  126. continue
  127. # 层级控制
  128. if task['depth'] > self._max_depth:
  129. logger.debug(f'<{t_name}> - 层级超限 - {task["url"]}')
  130. # self.push_records(task)
  131. continue
  132. dont_visit = re.match(URL_SUFFIX_PATTERN, task['url']) is not None
  133. if not dont_visit:
  134. try:
  135. success = self.process(t_name, task)
  136. if not success:
  137. self.validator.add_data(task['url'])
  138. except Exception as e:
  139. logger.exception(e)
  140. # '''挖掘记录'''
  141. # self.push_records(task)
  142. def start(self):
  143. with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
  144. futures = []
  145. for _ in range(1, self._workers + 1):
  146. f = executor.submit(self.excavate)
  147. f.add_done_callback(err_details)
  148. futures.append(f)
  149. wait(futures)