dongzhaorui 3 년 전
부모
커밋
14e5d2062d

+ 4 - 4
find_source/crawler/services/basics.py

@@ -71,7 +71,7 @@ class BasicSearch:
     def loops_interval(interval):
         t_name = threading.currentThread().getName()
         next_run_time = delay_by((interval or 300))
-        logger.debug(f'程序运行结束:<{t_name}>,下次运行时间:{next_run_time}')
+        logger.debug(f'程序运行结束:<{t_name}>,下次运行时间:<{next_run_time}>')
         time.sleep(interval)
 
     @staticmethod
@@ -118,21 +118,21 @@ class BasicSearch:
 
     def push_domain(self, task: Task):
         """挖掘网站的查询结果"""
-        logger.info(f"[推送挖掘结果]【{task['name']} - {task['domain']}】")
+        logger.info(f"[数据挖掘 - 推送]【{task['name']} - {task['domain']}】")
         if not self.collector.data(task['domain']):
             self._push_data('save', task, MGO_DOMAIN)
             self.collector.add_data(task['domain'])
 
     def push_query(self, task: Task):
         """搜索组织单位查询结果"""
-        logger.info(f"[推送查询结果]【{task['name']} - {task['url']}】")
+        logger.info(f"[查询结果 - 推送]【{task['name']} - {task['url']}】")
         self._push_data('save', task, MGO_QUERY)
 
     def push_records(self, task: Task):
         """挖掘数据的记录"""
         if task['name'] > 20:
             task['name'] = '{:.20s}'.format(task['name'])
-        logger.info(f"[推送数据记录]【{task['name']} - {task['url']}】")
+        logger.info(f"[数据记录 - 推送]【{task['name']} - {task['url']}】")
         self._push_data('records', task, MGO_RECORDS)
 
     def orgs_table(self) -> List[Mapping]:

+ 4 - 3
find_source/crawler/services/data_excavate.py

@@ -20,12 +20,13 @@ class DataExcavate(BasicSearch):
         self._workers = workers
 
     def retrieve_site(self, task: Task):
-        logger.info(f'[数据挖掘]开始请求 - {task["url"]}')
+        t_name = threading.currentThread().getName()
+        logger.info(f'[{t_name}]开始请求 - {task["url"]}')
         response = self.downloader.get(task['url'])
         task['status_code'] = response.status_code
         if response.status_code != 200 or response.text in ['', None]:
             task['err_reason'] = response.reason
-            logger.error(f'[数据挖掘]异常网址 - {task["url"]} : {response.reason}')
+            logger.error(f'[{t_name}]异常网址 - {task["url"]} - {response.reason}')
             return False
 
         task['domain'] = extract_domain(task['url'])
@@ -58,7 +59,7 @@ class DataExcavate(BasicSearch):
 
     def excavate(self):
         t_name = threading.currentThread().getName()
-        logger.info(f'[数据挖掘]启动 - {t_name}')
+        logger.info(f'[{t_name}]数据挖掘 - 启动')
         while True:
             tasks = self.scheduler.get_excavate_task()
             if len(tasks) == 0:

+ 6 - 6
find_source/crawler/services/data_query.py

@@ -32,7 +32,7 @@ class QueryKeyWord(BasicSearch):
 
     def query_keyword(self):
         t_name = threading.currentThread().getName()
-        logger.info(f'[关键词查询]启动 - <{t_name} - {self._name}>')
+        logger.info(f'[查询搜索词]启动 - <{t_name} - {self._name}>')
         while True:
             tasks = self.scheduler.get_query_task(self.keyword_groups)
             if len(tasks) == 0:
@@ -40,7 +40,7 @@ class QueryKeyWord(BasicSearch):
                 continue
 
             task_key, task = tasks
-            logger.info(f"<{self._name} - {task['groups']}> >> {task['search']}")
+            logger.info(f"<{t_name} - {self._name}>{task['groups']} >> {task['search']}")
             cur_page = 0
             while cur_page < self._max_pages:
                 cur_page += 1
@@ -65,7 +65,7 @@ class QueryKeyWord(BasicSearch):
             # self.push_records(task)
 
     def start(self):
-        logger.info(f'[关键词查询]初始化加载')
+        logger.info(f'[查询搜索词]初始化加载')
         with ThreadPoolExecutor(self._workers, 'QueryKeyWord') as executor:
             futures = []
             for _ in range(1, self._workers + 1):
@@ -92,7 +92,7 @@ class QueryOrganization(BasicSearch):
 
     def query_org(self):
         t_name = threading.currentThread().getName()
-        logger.info(f'[单位组织查询]启动 - <{t_name} - {self._name}>')
+        logger.info(f'[查询单位组织]启动 - <{t_name} - {self._name}>')
         while True:
             tasks = self.scheduler.get_query_task(self.org_groups)
             if len(tasks) == 0:
@@ -101,7 +101,7 @@ class QueryOrganization(BasicSearch):
 
             task_key, task = tasks
             word = task['search']
-            logger.info(f"<{self._name} - {task['groups']}> >> {word}")
+            logger.info(f"<{t_name} - {self._name}> {task['groups']} >> {word}")
             try:
                 url = self.engine.search(word)
                 task['url'] = url
@@ -128,7 +128,7 @@ class QueryOrganization(BasicSearch):
             # self.push_records(task)
 
     def start(self):
-        logger.info(f'[单位组织查询]初始化加载')
+        logger.info(f'[查询单位组织]初始化加载')
         with ThreadPoolExecutor(self._workers, 'QueryOrganization') as executor:
             futures = []
             for _ in range(1, self._workers + 1):

+ 32 - 10
find_source/crawler/services/sync_data.py

@@ -2,20 +2,28 @@ import threading
 
 from common.log import logger
 from crawler.services.basics import BasicSearch
-from crawler.utils import is_url
+from crawler.utils import is_url, extract_domain
 from settings import (
     MGO_URLS,
     MGO_ORGS,
     MGO_COMPETING_GOODS,
-    MGO_REMOVAL_DUPLICATE
+    MGO_REMOVAL_DUPLICATE,
+    MGO_LUA_SPIDERS
 )
 
 
 class SyncData(BasicSearch):
 
-    def __init__(self, init_validator=False, loop_interval=600, **kwargs):
+    def __init__(
+            self,
+            init_validator=False,
+            init_collector=False,
+            loop_interval=600,
+            **kwargs
+    ):
         super(SyncData, self).__init__(**kwargs)
         self._init_validator = init_validator
+        self._init_collector = init_collector
         self._interval = loop_interval
         self._init()
 
@@ -25,7 +33,7 @@ class SyncData(BasicSearch):
     def sync_data_keywords(self):
         """同步关键词数据"""
         logger.info(f'[同步数据]加载关键词')
-        words = self.seed_keywords()
+        words = self.keywords_table()
         # 处理关键词格式并推送到任务队列
         words = [str(word).replace('&nbsp;', '').strip() for word in words]
         lst = []
@@ -43,7 +51,7 @@ class SyncData(BasicSearch):
     def sync_data_orgs(self):
         """同步组织单位数据"""
         logger.info(f'[同步数据]加载单位组织数据')
-        items = self.seed_orgs()
+        items = self.orgs_table()
         # 处理单位组织名称并推送到任务队列
         orgs = []
         for item in items:
@@ -74,7 +82,7 @@ class SyncData(BasicSearch):
     def sync_data_urls(self):
         """同步网址数据"""
         logger.info(f'[同步数据]加载种子url列表')
-        items = self.seed_urls()
+        items = self.seed_urls_table()
         lst = []
         for item in items:
             if not is_url(item['name']):
@@ -102,7 +110,7 @@ class SyncData(BasicSearch):
     def sync_data_competing_goods(self):
         """同步竞品urls"""
         logger.info(f'[同步数据]加载竞品url列表')
-        items = self.seed_competing_goods()
+        items = self.competing_goods_table()
         # 处理竞品urls并推送到任务队列
         lst = []
         for item in items:
@@ -131,9 +139,23 @@ class SyncData(BasicSearch):
 
     def sync_collector(self):
         """同步lua已收录网址,推送url收录器"""
-        logger.info(f'[同步数据]初始化加载收录器')
-        total = self.lua_common_domains()
-        logger.info(f'[同步数据]新收录{total}个网站域名')
+        if self._init_collector:
+            logger.info(f'[同步数据]初始化加载收录器')
+            count = 0
+            projection = {'param_common': 1}
+            cursor = MGO_LUA_SPIDERS.find(projection=projection)
+            for item in cursor.sort(self.sort):
+                try:
+                    url = item['param_common'][11]
+                    if not is_url(url):
+                        continue
+                    domain = extract_domain(url)
+                except IndexError:
+                    continue
+                if not self.collector.data(domain):
+                    self.collector.add_data(domain)
+                    count += 1
+            logger.info(f'[同步数据]新收录{count}个网站域名')
 
     def sync_validator(self):
         """将垃圾表内容加载到过滤器"""