dongzhaorui 3 年 前
コミット
3d559ac64a

+ 10 - 15
find_source/crawler/services/basics.py

@@ -4,7 +4,6 @@ from typing import List, Mapping
 
 from common.databases import insert_one, update_one_by_domain
 from common.log import logger
-from common.tools import delay_by
 from constants import (
     ORGANIZATION,
     KEYWORD,
@@ -63,12 +62,13 @@ class BasicService:
         self.url_groups = SEED_URL
         self.competing_groups = COMPETING_GOODS
 
-    @staticmethod
-    def loops_interval(interval, enable_debug_log=False):
-        t_name = threading.currentThread().getName()
-        next_run_time = delay_by((interval or 300))
+    @property
+    def thread_name(self):
+        return threading.currentThread().getName()
+
+    def loops_interval(self, interval, enable_debug_log=False):
         if enable_debug_log:
-            logger.debug(f'运行结束:<{t_name}>,下次运行时间:{next_run_time}')
+            logger.debug(f'Thread-<{self.thread_name}> is closed.')
         time.sleep(interval)
 
     @staticmethod
@@ -113,22 +113,21 @@ class BasicService:
         return item
 
     def _push_data(self, purpose: str, task: Task, collection):
-        t_name = threading.currentThread().getName()
         if purpose == 'query':
             item = self.make_retrieve_item(task)
             insert_one(collection, item)
-            logger.info(f'<{t_name}> - 上传查询结果 - {item["_id"]}')
+            logger.info(f'<{self.thread_name}> - 查询结果 - {item["_id"]}')
         elif purpose == 'domain':
             item = self.make_domain_item(task)
             insert_one(collection, item)
-            logger.info(f'<{t_name}> - 上传挖掘结果 - {item["_id"]}')
+            logger.info(f'<{self.thread_name}> - 寻源结果 - {task["domain"]}')
         elif purpose == 'remove':
             item = self.make_duplicate_removal(task)
             update_one_by_domain(collection, item)
-            logger.info(f'<{t_name}> - 上传去重特征 - {item["domain"]}')
+            logger.info(f'<{self.thread_name}> - 添加过滤 - {task["url"]}')
         else:
             insert_one(collection, task)
-            logger.info(f'<{t_name}> - 上传记录数据 - {task["_id"]}')
+            logger.info(f'<{self.thread_name}> - 记录数据 - {task["_id"]}')
 
     def push_remove(self, task: Task):
         """数据去重表"""
@@ -140,10 +139,6 @@ class BasicService:
 
     def push_domain(self, task: Task):
         """数据挖掘结果,推送保存"""
-        if task['groups'] == self.url_groups:
-            duplicate = str(task['origin']).count(task['domain']) > 0
-            if duplicate:
-                return False
         if not self.collector.data(task['domain']):
             self._push_data('domain', task, MGO_DOMAIN)
             self.collector.add_data(task['domain'])

+ 13 - 9
find_source/crawler/services/query.py

@@ -1,4 +1,3 @@
-import threading
 from concurrent.futures import ThreadPoolExecutor, wait
 
 from common.execptions import ExploreDataError
@@ -30,11 +29,10 @@ class DataQuery(BasicService):
         self._engine = engine
         self._name = engine.__class__.__name__
         self._app = _app_items[engine.usage]
-        self._app_name = f'DataQuery_{engine.usage}'
+        self._app_name = f'dataQuery_{engine.usage}'
 
     def _keywords(self):
-        t_name = threading.currentThread().getName()
-        logger.info(f'开启线程 - <{t_name}>')
+        logger.info(f'开启线程 - <{self.thread_name}>')
         _max_pages = (self.kwargs.pop('max_pages', None) or 1)
         while True:
             tasks = self.scheduler.get_query_task(self.keyword_groups)
@@ -43,7 +41,7 @@ class DataQuery(BasicService):
                 continue
 
             task_key, task = tasks
-            logger.info(f"<{t_name}> - {self._name} - {task['search']}")
+            logger.info(f"<{self.thread_name}> - {self._name} - {task['search']}")
             cur_page = 0
             while cur_page < _max_pages:
                 cur_page += 1
@@ -62,13 +60,19 @@ class DataQuery(BasicService):
                         ))
                 '''推送数据挖掘队列'''
                 self.scheduler.add_excavate(lst, level=task['weight'])
-                logger.info(f'<{t_name}> - {self._name} - {task["search"]} - 第{cur_page}页,共{len(lst)}条')
+                msg = "<{}> - {} - {} - 第{}页,共{}条".format(
+                    self.thread_name,
+                    self._name,
+                    task["search"],
+                    cur_page,
+                    len(lst)
+                )
+                logger.info(msg)
             # '''查询记录'''
             # self.push_records(task)
 
     def _organization(self):
-        t_name = threading.currentThread().getName()
-        logger.info(f'开启线程 - <{t_name}>')
+        logger.info(f'开启线程 - <{self.thread_name}>')
         while True:
             tasks = self.scheduler.get_query_task(self.org_groups)
             if len(tasks) == 0:
@@ -77,7 +81,7 @@ class DataQuery(BasicService):
 
             task_key, task = tasks
             word = task['search']
-            logger.info(f"<{t_name}> - {self._name} - {word}")
+            logger.info(f"<{self.thread_name}> - {self._name} - {word}")
             try:
                 url = self._engine.search(word)
                 task['url'] = url

+ 3 - 13
find_source/crawler/services/sync_data.py

@@ -15,12 +15,7 @@ from settings import (
 class SyncData(BasicService):
     """数据同步服务"""
 
-    def __init__(
-            self,
-            init_validator=False,
-            init_collector=False,
-            **kwargs
-    ):
+    def __init__(self, init_validator=False, init_collector=False, **kwargs):
         self._sync_validate_data = (kwargs.pop('validate_interval', None) or 7200)
         self._keywords_interval = (kwargs.pop('keywords_interval', None) or 3600)
         self._competing_goods_interval = (kwargs.pop('competing_goods_interval', None) or 3600)
@@ -89,10 +84,6 @@ class SyncData(BasicService):
             if not is_url(item['name']):
                 items.remove(item)
                 continue
-            # exists_url = self.validator.data(item['name'])
-            # if exists_url:
-            #     items.remove(item)
-            #     continue
             lst.append(self.make_task(
                 url=item['name'],
                 origin=item['name'],
@@ -139,7 +130,7 @@ class SyncData(BasicService):
         logger.info(f'[数据同步]任务队列读取{len(items)}条竞品网址')
 
     def sync_collector(self):
-        """同步lua已收录网址,推送url收录器"""
+        """收录器,存放新发现和已拥有的网址域名"""
         if self._init_collector:
             logger.info(f'[数据同步]开始加载 - 收录器')
             count = 0
@@ -157,7 +148,7 @@ class SyncData(BasicService):
             logger.info(f'[数据同步]收录器读取{count}个网址域名')
 
     def sync_validator(self):
-        """将垃圾表内容加载到过滤器"""
+        """垃圾池:存放寻源过程中垃圾网址和没有相关信息的网址"""
         if self._init_validator:
             logger.info(f'[数据同步]开始加载 - 过滤器')
             count = 0
@@ -238,7 +229,6 @@ class SyncData(BasicService):
                 else:
                     self.loops_interval(2)
 
-        logger.info(f'[数据同步]初始化加载')
         threading.Thread(target=_validate, name='SyncValidateData').start()
         threading.Thread(target=_keywords, name='SyncKeywords').start()
         threading.Thread(target=_competing_goods, name='SyncCompetingGoods').start()