dongzhaorui 3 سال پیش
والد
کامیت
ac2307f046
2فایلهای تغییر یافته به همراه60 افزوده شده و 26 حذف شده
  1. 55 19
      find_source/crawler/services/excavate.py
  2. 5 7
      find_source/crawler/services/query.py

+ 55 - 19
find_source/crawler/services/excavate.py

@@ -1,5 +1,6 @@
 import threading
 from concurrent.futures import ThreadPoolExecutor, wait
+from typing import List
 
 from common.log import logger
 from crawler.Task import Task
@@ -8,11 +9,11 @@ from crawler.utils import (
     extract_base_url,
     extract_page_title,
     extract_domain,
-    parser_domain,
+    split_domain,
     err_details,
 )
 
-TLDS = ['com', 'cn']
+TLDS = ['com', 'cn', 'net', 'org']
 
 
 class DataExcavate(BasicSearch):
@@ -21,8 +22,29 @@ class DataExcavate(BasicSearch):
         super(DataExcavate, self).__init__(**kwargs)
         self._interval = loop_interval
         self._workers = workers
+        self._max_depth = (kwargs.pop('excavate_depth', None) or 3)
+        self._default_depth = 1
+
+    def _init_depth(self, task: Task):
+        if task.get('depth') is None:
+            task.update({'depth': self._default_depth})
+
+    def check_depth(self, task: Task, sub_tasks: List[Task]):
+        _results = []
+        curr_depth = task['depth']
+        if curr_depth < self._max_depth:
+            for t in sub_tasks:
+                t.setdefault('depth', curr_depth + 1)
+                _results.append(t)
+        return _results
 
     def is_rubbish(self, url: str):
+        """
+        网址过滤器
+
+        :param url: 网址
+        :return: bool
+        """
         if self.validator.data(url):
             return True
 
@@ -33,25 +55,33 @@ class DataExcavate(BasicSearch):
         if domain.startswith('www.'):
             domain = domain.replace('www.', '')
 
-        domain_lst = parser_domain(domain)
+        '''域名处理'''
+        domain_lst = split_domain(domain)
         domain_lst = [d for d in domain_lst if d not in TLDS]
+        text = ".".join(domain_lst)
+        if self.validator.data(text):
+            return True
+
+        '''检查域名中所包含的字符串'''
         for val in domain_lst:
             if self.validator.data(val):
                 return True
         return False
 
-    def retrieve_site(self, task: Task):
+    def process(self, task: Task):
         t_name = threading.currentThread().getName()
-        logger.info(f'[{t_name}]开始请求 - {task["url"]}')
+        logger.info(f'<{t_name}> - 请求 - {task["url"]}')
         response = self.downloader.get(task['url'])
-        task['status_code'] = response.status_code
-        if response.status_code != 200 or response.text in ['', None]:
-            task['err_reason'] = response.reason
-            logger.error(f'[{t_name}]异常网址 - {task["url"]} - {response.reason}')
+        status_code = response.status_code
+        page_source = response.text
+        reason = response.reason
+        task['status_code'] = status_code
+        if status_code != 200 or page_source in ['', None]:
+            task['err_reason'] = reason
+            logger.error(f'<{t_name}> - {reason} - {status_code} - {task["url"]}')
             return False
 
         task['domain'] = extract_domain(task['url'])
-        page_source = response.text
         task['name'] = extract_page_title(page_source)
         task['base_url'] = extract_base_url(task['url'])
 
@@ -72,7 +102,8 @@ class DataExcavate(BasicSearch):
                 _c += 1
 
         if _c > 1:
-            self.push_domain(task)
+            if self.push_domain(task):
+                lst = self.check_depth(task, lst)
         else:
             self.push_remove(task)
         self.scheduler.add_excavate(lst, level=task['weight'])
@@ -80,7 +111,7 @@ class DataExcavate(BasicSearch):
 
     def excavate(self):
         t_name = threading.currentThread().getName()
-        logger.info(f'[{t_name}]数据挖掘 - 启动')
+        logger.info(f'开启线程 - <{t_name}>')
         while True:
             tasks = self.scheduler.get_excavate_task()
             if len(tasks) == 0:
@@ -88,19 +119,24 @@ class DataExcavate(BasicSearch):
                 continue
 
             task_key, task = tasks
+            '''初始化网站层级'''
+            self._init_depth(task)
             if self.is_rubbish(task['url']):
-                logger.info(f'[{t_name}]过滤网址 - {task["url"]}')
+                logger.debug(f'<{t_name}> - 去重网址 - {task["url"]}')
                 continue
-            '''挖掘站点'''
-            success = self.retrieve_site(task)
-            if not success:
-                '''url - 添加过滤器'''
-                self.validator.add_data(task['url'])
+
+            '''数据挖掘'''
+            try:
+                success = self.process(task)
+                if not success:
+                    '''url - 添加过滤器'''
+                    self.validator.add_data(task['url'])
+            except Exception as e:
+                logger.exception(e)
             # '''挖掘记录'''
             # self.push_records(task)
 
     def start(self):
-        logger.info(f'[数据挖掘]初始化加载')
         with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
             futures = []
             for _ in range(1, self._workers + 1):

+ 5 - 7
find_source/crawler/services/query.py

@@ -32,7 +32,7 @@ class QueryKeyWord(BasicSearch):
 
     def query_keyword(self):
         t_name = threading.currentThread().getName()
-        logger.info(f'[查询搜索词]启动 - <{t_name} - {self._name}>')
+        logger.info(f'开启线程 - <{t_name}>')
         while True:
             tasks = self.scheduler.get_query_task(self.keyword_groups)
             if len(tasks) == 0:
@@ -40,7 +40,7 @@ class QueryKeyWord(BasicSearch):
                 continue
 
             task_key, task = tasks
-            logger.info(f"<{t_name} - {self._name}>{task['groups']} >> {task['search']}")
+            logger.info(f"<{t_name}> - {self._name} - {task['search']}")
             cur_page = 0
             while cur_page < self._max_pages:
                 cur_page += 1
@@ -60,12 +60,11 @@ class QueryKeyWord(BasicSearch):
                     ))
                 '''推送数据挖掘队列'''
                 self.scheduler.add_excavate(lst, level=task['weight'])
-                logger.info(f'<{self._name}> {task["search"]}-第{cur_page}页-共{len(lst)}条')
+                logger.info(f'<{t_name}> - {self._name} - {task["search"]} - 第{cur_page}页,共{len(lst)}条')
             # '''查询记录'''
             # self.push_records(task)
 
     def start(self):
-        logger.info(f'[查询搜索词]初始化加载')
         with ThreadPoolExecutor(self._workers, 'QueryKeyWord') as executor:
             futures = []
             for _ in range(1, self._workers + 1):
@@ -92,7 +91,7 @@ class QueryOrganization(BasicSearch):
 
     def query_org(self):
         t_name = threading.currentThread().getName()
-        logger.info(f'[查询单位组织]启动 - <{t_name} - {self._name}>')
+        logger.info(f'开启线程 - <{t_name}>')
         while True:
             tasks = self.scheduler.get_query_task(self.org_groups)
             if len(tasks) == 0:
@@ -101,7 +100,7 @@ class QueryOrganization(BasicSearch):
 
             task_key, task = tasks
             word = task['search']
-            logger.info(f"<{t_name} - {self._name}> {task['groups']} >> {word}")
+            logger.info(f"<{t_name}> - {self._name} - {word}")
             try:
                 url = self.engine.search(word)
                 task['url'] = url
@@ -128,7 +127,6 @@ class QueryOrganization(BasicSearch):
             # self.push_records(task)
 
     def start(self):
-        logger.info(f'[查询单位组织]初始化加载')
         with ThreadPoolExecutor(self._workers, 'QueryOrganization') as executor:
             futures = []
             for _ in range(1, self._workers + 1):