excavate.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import re
  2. import time
  3. from concurrent.futures import ThreadPoolExecutor, wait
  4. from common.databases import int2long
  5. from common.log import logger
  6. from crawler.Task import Task
  7. from crawler.analysis import TimeExtractor
  8. from crawler.services.basics import BasicService
  9. from crawler.utils import (
  10. extract_host,
  11. extract_page_title,
  12. extract_domain,
  13. err_details,
  14. is_url,
  15. html2element,
  16. iter_node,
  17. check_page_by_words,
  18. predict_bidding_model,
  19. is_contains
  20. )
  21. from settings import Dzr
  22. TLDS = ['com', 'cn', 'net', 'org']
  23. URL_SUFFIX = ['pdf', 'xls', 'xlsx', 'docx', 'doc', 'rar', 'zip']
  24. URL_SUFFIX_PATTERN = '.*(' + '|'.join(URL_SUFFIX) + ')$'
  25. def predict_data(html, task: Task):
  26. data_json = {'html': html, 'predict': 0}
  27. if html is None:
  28. return data_json
  29. data = {'contenthtml': html}
  30. data_json = predict_bidding_model(data)
  31. Dzr.insert_one({
  32. 'site': task['origin'],
  33. # 'html': compress_str(html),
  34. 'url': task['url'],
  35. 'predict': data_json['predict'],
  36. 'comeintime': int2long(int(time.time()))
  37. })
  38. return data_json
  39. class DataExcavate(BasicService):
  40. """数据挖掘服务"""
  41. def __init__(self, **kwargs):
  42. self._workers = (kwargs.pop('workers', None) or 1)
  43. self._max_depth = (kwargs.pop('excavate_depth', None) or 1)
  44. self._interval = (kwargs.pop('excavate_interval', None) or 60)
  45. super(DataExcavate, self).__init__(**kwargs)
  46. self._default_depth = 1
  47. def _init_depth(self, task: Task):
  48. if task.get('depth') is None:
  49. task.update({'depth': self._default_depth})
  50. def interceptor(self, task: Task):
  51. """
  52. 拦截器:拦截需要丢弃的任务
  53. :param task: 请求任务
  54. :return:
  55. """
  56. url = task['url']
  57. if not is_url(url):
  58. logger.debug(f'<{self.thread_name}> - 垃圾网址 - {url}')
  59. return True
  60. curr_depth = task['depth']
  61. domain = extract_domain(url)
  62. is_attachment_url = re.match(URL_SUFFIX_PATTERN, url) is not None
  63. is_login_url = is_contains(url, 'login')
  64. if any([
  65. curr_depth > self._max_depth, # 检查任务层级
  66. is_attachment_url, # 判断是否附件下载地址
  67. is_login_url, # 判断是否登录地址
  68. self.validator.data(url), # 垃圾池 - 判重任务请求网址
  69. self.validator.data(domain), # 垃圾池 - 过滤域名
  70. curr_depth > 1 and self.collector.data(domain), # 收录池 - 判重域名(一级)[种子不与已收录判重]
  71. ]):
  72. logger.debug(f'<{self.thread_name}> - 无效任务 - {curr_depth} - {url}')
  73. return True
  74. return False
  75. def same_origin_strategy(self, source, task: Task):
  76. """同源策略"""
  77. # 排查时间文本
  78. hit_date_total = 0
  79. date_nodes = []
  80. element = html2element(source)
  81. for node, _ in iter_node(element):
  82. pt = TimeExtractor().extractor(node)
  83. if pt and not node.getchildren() and node not in date_nodes:
  84. date_nodes.append(node)
  85. hit_date_total += 1
  86. # 全文排查检索词
  87. hit_text_total = 0
  88. all_text = ["".join(text.split()) for text in element.itertext()]
  89. all_text = [text for text in all_text if len(text) > 0]
  90. for text in all_text:
  91. # print(text)
  92. if check_page_by_words(text):
  93. hit_text_total += 1
  94. # 寻源结果
  95. if all([3 < hit_date_total < 50, hit_text_total > 3]):
  96. if hit_text_total < 5:
  97. # 关键词计数为3-5网站 标记为2
  98. task["hit5"] = 2
  99. self.push_domain(task)
  100. elif hit_text_total > 5:
  101. # 关键词计数为5以上网站 标记为1
  102. task["hit5"] = 1
  103. self.push_domain(task)
  104. else:
  105. self.push_remove(task)
  106. # 抽取当前页面同源链接(使用同源模式:mode=1),不遗漏挖掘层级,同时权重优先级+1,优先处理
  107. lst = []
  108. history = []
  109. sub_depth = task['depth'] + 1
  110. sub_weight = task['weight'] + 1
  111. items = self.parser.turls(source, task['url'], mode=1)
  112. for item in items:
  113. name, url = item['title'], item['href']
  114. if not self.validator.data(url) and url not in history:
  115. lst.append(self.make_task(
  116. url=url,
  117. name=name,
  118. depth=sub_depth,
  119. origin=task['origin'],
  120. groups=task['groups'],
  121. classify=self.visit_classify,
  122. weight=sub_weight
  123. ))
  124. history.append(url)
  125. self.scheduler.add_excavate(lst, level=sub_weight)
  126. def non_origin_strategy(self, source, task: Task):
  127. """非同源策略"""
  128. lst = []
  129. history = []
  130. # 抽取当前页面非同源链接(使用非同源模式:mode=2),添加挖掘队列,一个站点挖掘结束,开始处理新站点
  131. non_origin_urls = self.parser.urls(source, task['url'], mode=2)
  132. for url in non_origin_urls:
  133. if not self.validator.data(url) and url not in history:
  134. lst.append(self.make_task(
  135. url=url,
  136. origin=task['origin'],
  137. groups=task['groups'],
  138. classify=self.visit_classify,
  139. weight=self.url_weight
  140. ))
  141. history.append(url)
  142. self.scheduler.add_excavate(lst, level=self.url_weight)
  143. def fetch_page(self, task: Task):
  144. url = task['url']
  145. response = self.downloader.get(url, timeout=5)
  146. task['status_code'] = status_code = response.status_code
  147. html = response.text
  148. reason = response.reason
  149. if status_code != 200 or html in ['', None]:
  150. task['err_reason'] = reason
  151. msg = '<{thread_name}> - {url} - {status_code} - {reason}'.format(
  152. thread_name=self.thread_name,
  153. url=url,
  154. status_code=status_code,
  155. reason=reason
  156. )
  157. logger.error(msg)
  158. return None
  159. return html
  160. def process(self, task: Task):
  161. logger.info(f'<{self.thread_name}> - 请求 - {task["url"]}')
  162. status = False
  163. # 预处理域名[domain]与基准地址[base_url]
  164. task['domain'] = extract_domain(task['url'])
  165. task['base_url'] = extract_host(task['url'])
  166. html = self.fetch_page(task) # 发起请求
  167. if html is not None:
  168. predict_res = predict_data(html, task) # 招投标预测结果
  169. if predict_res['predict']:
  170. task['name'] = extract_page_title(html)
  171. self.same_origin_strategy(html, task)
  172. self.non_origin_strategy(html, task)
  173. status = True
  174. # 处理过的任务加入过滤器,防止重复处理
  175. self.push_remove(task)
  176. return status
  177. def excavate(self):
  178. logger.info(f'开启线程 - <{self.thread_name}>')
  179. while True:
  180. tasks = self.scheduler.get_excavate_task()
  181. if len(tasks) == 0:
  182. self.loops_interval(self._interval, enable_debug_log=True)
  183. continue
  184. task_key, task = tasks
  185. self._init_depth(task) # 初始化网站层级
  186. if not self.interceptor(task):
  187. try:
  188. self.process(task)
  189. except Exception as e:
  190. logger.exception(f'<{self.thread_name}> {e}')
  191. # '''挖掘记录'''
  192. # self.push_records(task)
  193. def start(self):
  194. with ThreadPoolExecutor(self._workers, 'dataExcavate') as executor:
  195. futures = []
  196. for _ in range(1, self._workers + 1):
  197. f = executor.submit(self.excavate)
  198. f.add_done_callback(err_details)
  199. futures.append(f)
  200. wait(futures)