excavate.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import re
  2. import time
  3. from concurrent.futures import ThreadPoolExecutor, wait
  4. from common.databases import int2long
  5. from common.log import logger
  6. from crawler.Task import Task
  7. from crawler.analysis import TimeExtractor
  8. from crawler.services.basics import BasicService
  9. from crawler.utils import (
  10. extract_host,
  11. extract_page_title,
  12. extract_domain,
  13. err_details,
  14. is_url,
  15. html2element,
  16. iter_node,
  17. check_page_by_words,
  18. predict_bidding_model, is_contains
  19. )
  20. from settings import Dzr
  21. TLDS = ['com', 'cn', 'net', 'org']
  22. URL_SUFFIX = ['pdf', 'xls', 'xlsx', 'docx', 'doc', 'rar', 'zip']
  23. URL_SUFFIX_PATTERN = '.*(' + '|'.join(URL_SUFFIX) + ')$'
  24. def predict_data(html, task: Task):
  25. data_json = {'html': html, 'predict': 0}
  26. if html is None:
  27. return data_json
  28. data = {'contenthtml': html}
  29. data_json = predict_bidding_model(data)
  30. Dzr.insert_one({
  31. 'site': task['origin'],
  32. # 'html': compress_str(html),
  33. 'url': task['url'],
  34. 'predict': data_json['predict'],
  35. 'comeintime': int2long(int(time.time()))
  36. })
  37. return data_json
  38. class DataExcavate(BasicService):
  39. """数据挖掘服务"""
  40. def __init__(self, **kwargs):
  41. self._workers = (kwargs.pop('workers', None) or 1)
  42. self._max_depth = (kwargs.pop('excavate_depth', None) or 1)
  43. self._interval = (kwargs.pop('excavate_interval', None) or 60)
  44. super(DataExcavate, self).__init__(**kwargs)
  45. self._default_depth = 1
  46. def _init_depth(self, task: Task):
  47. if task.get('depth') is None:
  48. task.update({'depth': self._default_depth})
  49. def interceptor(self, task: Task):
  50. """
  51. 拦截器:拦截需要丢弃的任务
  52. :param task: 请求任务
  53. :return:
  54. """
  55. url = task['url']
  56. if not is_url(url):
  57. logger.debug(f'<{self.thread_name}> - 垃圾网址 - {url}')
  58. return True
  59. curr_depth = task['depth']
  60. domain = extract_domain(url)
  61. is_attachment_url = re.match(URL_SUFFIX_PATTERN, url) is not None
  62. is_login_url = is_contains(url, 'login')
  63. if any([
  64. curr_depth > self._max_depth, # 检查任务层级
  65. is_attachment_url, # 判断是否附件下载地址
  66. is_login_url, # 判断是否登录地址
  67. self.validator.data(url), # 垃圾池 - 判重任务请求网址
  68. self.validator.data(domain), # 垃圾池 - 过滤域名(一级)
  69. self.collector.data(domain), # 收录池 - 判重域名(一级)
  70. ]):
  71. logger.debug(f'<{self.thread_name}> - 无效任务 - {curr_depth} - {url}')
  72. return True
  73. return False
  74. def filter_data(self, lst):
  75. """通过垃圾过滤器过滤数据"""
  76. results = []
  77. for val in lst:
  78. if not self.validator.data(val):
  79. results.append(val)
  80. return results
  81. def same_origin_strategy(self, source, task: Task):
  82. """同源策略"""
  83. # 排查时间文本
  84. hit_date_total = 0
  85. date_nodes = []
  86. element = html2element(source)
  87. for node, _ in iter_node(element):
  88. pt = TimeExtractor().extractor(node)
  89. if pt and not node.getchildren() and node not in date_nodes:
  90. date_nodes.append(node)
  91. hit_date_total += 1
  92. # 全文排查检索词
  93. hit_text_total = 0
  94. all_text = ["".join(text.split()) for text in element.itertext()]
  95. all_text = [text for text in all_text if len(text) > 0]
  96. for text in all_text:
  97. # print(text)
  98. if check_page_by_words(text):
  99. hit_text_total += 1
  100. # 寻源结果
  101. if all([3 < hit_date_total < 50, hit_text_total > 3]):
  102. self.push_domain(task)
  103. elif hit_text_total > 5:
  104. self.push_domain(task)
  105. else:
  106. self.push_remove(task)
  107. # 抽取当前页面同源链接(使用同源模式:mode=1),不遗漏挖掘层级,同时权重优先级+1,优先处理
  108. lst = []
  109. history = []
  110. sub_depth = task['depth'] + 1
  111. sub_weight = task['weight'] + 1
  112. items = self.parser.turls(source, task['url'], mode=1)
  113. for item in items:
  114. name, url = item['title'], item['href']
  115. if not self.validator.data(url) and url not in history:
  116. lst.append(self.make_task(
  117. url=url,
  118. name=name,
  119. depth=sub_depth,
  120. origin=task['origin'],
  121. groups=task['groups'],
  122. classify=self.visit_classify,
  123. weight=sub_weight
  124. ))
  125. history.append(url)
  126. self.scheduler.add_excavate(lst, level=sub_weight)
  127. def non_origin_strategy(self, source, task: Task):
  128. """非同源策略"""
  129. lst = []
  130. history = []
  131. # 抽取当前页面非同源链接(使用非同源模式:mode=2),添加挖掘队列,一个站点挖掘结束,开始处理新站点
  132. non_origin_urls = self.parser.urls(source, task['url'], mode=2)
  133. for url in non_origin_urls:
  134. if not self.validator.data(url) and url not in history:
  135. lst.append(self.make_task(
  136. url=url,
  137. origin=task['origin'],
  138. groups=task['groups'],
  139. classify=self.visit_classify,
  140. weight=self.url_weight
  141. ))
  142. history.append(url)
  143. self.scheduler.add_excavate(lst, level=self.url_weight)
  144. def fetch_page(self, task: Task):
  145. url = task['url']
  146. response = self.downloader.get(url, timeout=5)
  147. status_code = response.status_code
  148. page_source = response.text
  149. reason = response.reason
  150. if status_code != 200 or page_source in ['', None]:
  151. task['err_reason'] = reason
  152. msg = f'<{self.thread_name}> - {url} - {status_code} - {reason}'
  153. logger.error(msg)
  154. return status_code, None
  155. return status_code, page_source
  156. def process(self, task: Task):
  157. logger.info(f'<{self.thread_name}> - 请求 - {task["url"]}')
  158. status_code, page_source = self.fetch_page(task)
  159. task['status_code'] = status_code
  160. predict_data(page_source, task)
  161. if page_source is None:
  162. # 访问失败的域名是否添加过滤器?
  163. self.push_remove(task)
  164. return False
  165. task['domain'] = extract_domain(task['url'])
  166. task['base_url'] = extract_host(task['url'])
  167. task['name'] = extract_page_title(page_source)
  168. self.same_origin_strategy(page_source, task)
  169. self.non_origin_strategy(page_source, task)
  170. return True
  171. def excavate(self):
  172. logger.info(f'开启线程 - <{self.thread_name}>')
  173. while True:
  174. tasks = self.scheduler.get_excavate_task()
  175. if len(tasks) == 0:
  176. self.loops_interval(self._interval, enable_debug_log=True)
  177. continue
  178. task_key, task = tasks
  179. self._init_depth(task) # 初始化网站层级
  180. if not self.interceptor(task):
  181. try:
  182. self.process(task)
  183. except Exception as e:
  184. logger.exception(e)
  185. # '''挖掘记录'''
  186. # self.push_records(task)
  187. def start(self):
  188. with ThreadPoolExecutor(self._workers, 'dataExcavate') as executor:
  189. futures = []
  190. for _ in range(1, self._workers + 1):
  191. f = executor.submit(self.excavate)
  192. f.add_done_callback(err_details)
  193. futures.append(f)
  194. wait(futures)