Browse Source

添加 - 同源策略和非同源策略处理流程

dongzhaorui 3 years ago
parent
commit
14ae52652b
1 changed files with 116 additions and 97 deletions
  1. 116 97
      find_source/crawler/services/excavate.py

+ 116 - 97
find_source/crawler/services/excavate.py

@@ -1,18 +1,19 @@
 import re
-import threading
 from concurrent.futures import ThreadPoolExecutor, wait
-from typing import List
 
 from common.log import logger
 from crawler.Task import Task
+from crawler.analysis import TimeExtractor
 from crawler.services.basics import BasicService
 from crawler.utils import (
     extract_host,
     extract_page_title,
     extract_domain,
-    split_domain,
     err_details,
     is_url,
+    html2element,
+    iter_node,
+    check_page_by_words
 )
 
 TLDS = ['com', 'cn', 'net', 'org']
@@ -34,105 +35,137 @@ class DataExcavate(BasicService):
         if task.get('depth') is None:
             task.update({'depth': self._default_depth})
 
-    def check_depth(self, task: Task, sub_tasks: List[Task]):
-        _results = []
-        curr_depth = task['depth']
-        if curr_depth < self._max_depth:
-            for t in sub_tasks:
-                t.setdefault('depth', curr_depth + 1)
-                _results.append(t)
-        return _results
-
-    def is_rubbish(self, url: str):
+    def interceptor(self, task: Task):
         """
-        网址过滤器
+        拦截器:拦截需要丢弃的任务
 
-        :param url: 网址
-        :return: False = 不存在, True = 存在
+        :param task: 请求任务
+        :return:
         """
+        url = task['url']
         if not is_url(url):
+            logger.debug(f'<{self.thread_name}> - 垃圾网址 - {url}')
             return True
 
-        if self.validator.data(url):
-            return True
-
+        curr_depth = task['depth']
         domain = extract_domain(url)
-        if self.validator.data(domain):
+        is_attachment_url = re.match(URL_SUFFIX_PATTERN, url) is not None
+        if any([
+            curr_depth > self._max_depth,  # 检查任务层级
+            is_attachment_url,  # 检查网址是否附件下载地址
+            self.validator.data(url),  # 垃圾池 - 判重任务请求网址
+            self.validator.data(domain),  # 垃圾池 - 过滤域名
+            self.collector.data(domain),  # 收录池 - 判重域名
+        ]):
+            logger.debug(f'<{self.thread_name}> - 无效任务 - {curr_depth} - {url}')
             return True
 
-        if domain.startswith('www.'):
-            domain = domain.replace('www.', '')
+        return False
 
-        '''域名处理'''
-        domain_lst = split_domain(domain)
-        domain_lst = [d for d in domain_lst if d not in TLDS]
-        text = ".".join(domain_lst)
-        if self.validator.data(text):
-            return True
+    def filter_data(self, lst):
+        """通过垃圾过滤器过滤数据"""
+        results = []
+        for val in lst:
+            if not self.validator.data(val):
+                results.append(val)
+        return results
+
+    def same_origin_strategy(self, source, task: Task):
+        """同源策略"""
+        # 排查时间文本
+        hit_date_total = 0
+        date_nodes = []
+        element = html2element(source)
+        for node, _ in iter_node(element):
+            pt = TimeExtractor().extractor(node)
+            if pt and not node.getchildren() and node not in date_nodes:
+                date_nodes.append(node)
+                hit_date_total += 1
+        # 全文排查检索词
+        hit_text_total = 0
+        all_text = ["".join(text.split()) for text in element.itertext()]
+        all_text = [text for text in all_text if len(text) > 0]
+        for text in all_text:
+            # print(text)
+            if check_page_by_words(text):
+                hit_text_total += 1
+        # 寻源结果
+        if all([3 < hit_date_total < 50, hit_text_total > 3]):
+            self.push_domain(task)
+        elif hit_text_total > 5:
+            self.push_domain(task)
+        else:
+            self.push_remove(task)
 
-        '''检查域名中所包含的字符串'''
-        for val in domain_lst:
-            if self.validator.data(val):
-                return True
-        return False
+        # 抽取当前页面同源链接(使用同源模式:mode=1),不遗漏挖掘层级,同时权重优先级+1,优先处理
+        lst = []
+        history = []
+        sub_depth = task['depth'] + 1
+        sub_weight = task['weight'] + 1
+        items = self.parser.turls(source, task['url'], mode=1)
+        for item in items:
+            name, url = item['title'], item['href']
+            if not self.validator.data(url) and url not in history:
+                lst.append(self.make_task(
+                    url=url,
+                    name=name,
+                    depth=sub_depth,
+                    origin=task['origin'],
+                    groups=task['groups'],
+                    classify=self.visit_classify,
+                    weight=sub_weight
+                ))
+                history.append(url)
+        self.scheduler.add_excavate(lst, level=sub_weight)
 
-    def process(self, t_name: str, task: Task):
-        logger.info(f'<{t_name}> - 请求 - {task["url"]}')
-        response = self.downloader.get(task['url'], timeout=5)
+    def non_origin_strategy(self, source, task: Task):
+        """非同源策略"""
+        lst = []
+        history = []
+        # 抽取当前页面非同源链接(使用非同源模式:mode=2),添加挖掘队列,一个站点挖掘结束,开始处理新站点
+        non_origin_urls = self.parser.urls(source, task['url'], mode=2)
+        for url in non_origin_urls:
+            if not self.validator.data(url) and url not in history:
+                lst.append(self.make_task(
+                    url=url,
+                    origin=task['origin'],
+                    groups=task['groups'],
+                    classify=self.visit_classify,
+                    weight=self.url_weight
+                ))
+                history.append(url)
+        self.scheduler.add_excavate(lst, level=self.url_weight)
+
+    def fetch_page(self, task: Task):
+        url = task['url']
+        response = self.downloader.get(url, timeout=5)
         status_code = response.status_code
         page_source = response.text
         reason = response.reason
-        task['status_code'] = status_code
         if status_code != 200 or page_source in ['', None]:
             task['err_reason'] = reason
-            logger.error(f'<{t_name}> - {reason} - {status_code} - {task["url"]}')
+            msg = f'<{self.thread_name}> - {url} - {status_code} - {reason}'
+            logger.error(msg)
+            return status_code, None
+        return status_code, page_source
+
+    def process(self, task: Task):
+        logger.info(f'<{self.thread_name}> - 请求 - {task["url"]}')
+        status_code, page_source = self.fetch_page(task)
+        task['status_code'] = status_code
+        if page_source is None:
+            # 访问失败的域名是否添加过滤器?
+            self.push_remove(task)
             return False
-
         task['domain'] = extract_domain(task['url'])
-        task['name'] = extract_page_title(page_source)
         task['base_url'] = extract_host(task['url'])
-
-        lst = []
-        _history = []
-        _c = 0  # 过滤词计数器
-        sub_depth = task['depth'] + 1
-        sub_weight = task['weight'] + 1
-        items = self.parser.non_origin(page_source, task['url'])
-        for item in items:
-            name, url = item['title'], item['href']
-            if self.validator.words(name):
-                if url not in _history:
-                    lst.append(self.make_task(
-                        url=url,
-                        name=name,
-                        depth=sub_depth,
-                        origin=task['origin'],
-                        groups=task['groups'],
-                        classify=self.visit_classify,
-                        weight=sub_weight
-                    ))
-                    _history.append(url)
-                _c += 1
-
-        if _c > 1:
-            save = self.push_domain(task)
-            msg = f'<{t_name}> - 收录成功 - {task["url"]}'
-            if not save:
-                msg = f'<{t_name}> - 重复收录 - {task["url"]}'
-        else:
-            remove = self.push_remove(task)
-            msg = f'<{t_name}> - 过滤丢弃 - {task["url"]}'
-            if not remove:
-                msg = f'<{t_name}> - 重复收录 - {task["url"]}'
-
-        logger.debug(msg)
-        '''层级深的,优先采集'''
-        self.scheduler.add_excavate(lst, level=sub_weight)
+        task['name'] = extract_page_title(page_source)
+        self.same_origin_strategy(page_source, task)
+        self.non_origin_strategy(page_source, task)
         return True
 
     def excavate(self):
-        t_name = threading.currentThread().getName()
-        logger.info(f'开启线程 - <{t_name}>')
+        logger.info(f'开启线程 - <{self.thread_name}>')
         while True:
             tasks = self.scheduler.get_excavate_task()
             if len(tasks) == 0:
@@ -140,31 +173,17 @@ class DataExcavate(BasicService):
                 continue
 
             task_key, task = tasks
-            # 初始化网站层级
-            self._init_depth(task)
-            if self.is_rubbish(task['url']):
-                logger.debug(f'<{t_name}> - 垃圾数据 - {task["url"]}')
-                continue
-
-            # 层级控制
-            if task['depth'] > self._max_depth:
-                logger.debug(f'<{t_name}> - 层级超限 - {task["url"]}')
-                # self.push_records(task)
-                continue
-
-            dont_visit = re.match(URL_SUFFIX_PATTERN, task['url']) is not None
-            if not dont_visit:
+            self._init_depth(task)  # 初始化网站层级
+            if not self.interceptor(task):
                 try:
-                    success = self.process(t_name, task)
-                    if not success:
-                        self.validator.add_data(task['url'])
+                    self.process(task)
                 except Exception as e:
                     logger.exception(e)
             # '''挖掘记录'''
             # self.push_records(task)
 
     def start(self):
-        with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
+        with ThreadPoolExecutor(self._workers, 'dataExcavate') as executor:
             futures = []
             for _ in range(1, self._workers + 1):
                 f = executor.submit(self.excavate)