dongzhaorui %!s(int64=3) %!d(string=hai) anos
pai
achega
0c06f81d44
Modificáronse 1 ficheiros con 27 adicións e 36 borrados
  1. 27 36
      find_source/crawler/services/query.py

+ 27 - 36
find_source/crawler/services/query.py

@@ -1,10 +1,9 @@
 import threading
 from concurrent.futures import ThreadPoolExecutor, wait
 
-from common.execptions import HostsRetrieveError
+from common.execptions import ExploreDataError
 from common.log import logger
-from crawler.engines import BingSearchEngine, QccSearchEngine
-from crawler.services.basics import BasicSearch
+from crawler.services.basics import BasicService
 from crawler.utils import (
     extract_base_url,
     extract_domain,
@@ -13,19 +12,30 @@ from crawler.utils import (
 )
 
 
-class QueryKeyWord(BasicSearch):
+class DataQuery(BasicService):
+    """数据查询服务"""
 
-    def __init__(self, engine=None, **kwargs):
+    def __init__(self, engine, **kwargs):
         self._workers = (kwargs.pop('query_workers', None) or 1)
-        self._max_pages = (kwargs.pop('max_pages', None) or 1)
         self._interval = (kwargs.pop('query_interval', None) or 60)
-        super(QueryKeyWord, self).__init__(**kwargs)
-        self.engine = (engine or BingSearchEngine())
-        self._name = self.engine.__class__.__name__
+        super(DataQuery, self).__init__(**kwargs)
+        self._init(engine)
+        self.kwargs = kwargs
 
-    def query_keyword(self):
+    def _init(self, engine):
+        _app_items = {
+            self.keyword_groups: self._keywords,
+            self.org_groups: self._organization
+        }
+        self._engine = engine
+        self._name = engine.__class__.__name__
+        self._app = _app_items[engine.usage]
+        self._app_name = f'DataQuery_{engine.usage}'
+
+    def _keywords(self):
         t_name = threading.currentThread().getName()
         logger.info(f'开启线程 - <{t_name}>')
+        _max_pages = (self.kwargs.pop('max_pages', None) or 1)
         while True:
             tasks = self.scheduler.get_query_task(self.keyword_groups)
             if len(tasks) == 0:
@@ -35,11 +45,11 @@ class QueryKeyWord(BasicSearch):
             task_key, task = tasks
             logger.info(f"<{t_name}> - {self._name} - {task['search']}")
             cur_page = 0
-            while cur_page < self._max_pages:
+            while cur_page < _max_pages:
                 cur_page += 1
                 '''检索页面元素生成数据挖掘任务'''
                 lst = []
-                urls = self.engine.search(task['search'], cur_page)
+                urls = self._engine.search(task['search'], cur_page)
                 for url in urls:
                     base_url = extract_base_url(url)
                     if not self.validator.data(base_url):
@@ -56,26 +66,7 @@ class QueryKeyWord(BasicSearch):
             # '''查询记录'''
             # self.push_records(task)
 
-    def start(self):
-        with ThreadPoolExecutor(self._workers, 'QueryKeyWord') as executor:
-            futures = []
-            for _ in range(1, self._workers + 1):
-                f = executor.submit(self.query_keyword)
-                f.add_done_callback(err_details)
-                futures.append(f)
-            wait(futures)
-
-
-class QueryOrganization(BasicSearch):
-
-    def __init__(self, engine=None, **kwargs):
-        self._workers = (kwargs.pop('query_workers', None) or 1)
-        self._interval = (kwargs.pop('query_interval', None) or 60)
-        super(QueryOrganization, self).__init__(**kwargs)
-        self.engine = (engine or QccSearchEngine())
-        self._name = self.engine.__class__.__name__
-
-    def query_org(self):
+    def _organization(self):
         t_name = threading.currentThread().getName()
         logger.info(f'开启线程 - <{t_name}>')
         while True:
@@ -88,7 +79,7 @@ class QueryOrganization(BasicSearch):
             word = task['search']
             logger.info(f"<{t_name}> - {self._name} - {word}")
             try:
-                url = self.engine.search(word)
+                url = self._engine.search(word)
                 task['url'] = url
                 task['name'] = word
                 task['domain'] = extract_domain(url)
@@ -103,7 +94,7 @@ class QueryOrganization(BasicSearch):
                 task['classify'] = self.visit_classify
                 '''推送数据挖掘队列'''
                 self.scheduler.add_excavate(task, level=task['weight'])
-            except HostsRetrieveError as e:
+            except ExploreDataError as e:
                 task['status_code'] = e.code
                 task['err_reason'] = e.reason
                 logger.exception(e)
@@ -113,10 +104,10 @@ class QueryOrganization(BasicSearch):
             # self.push_records(task)
 
     def start(self):
-        with ThreadPoolExecutor(self._workers, 'QueryOrganization') as executor:
+        with ThreadPoolExecutor(self._workers, self._app_name) as executor:
             futures = []
             for _ in range(1, self._workers + 1):
-                f = executor.submit(self.query_org)
+                f = executor.submit(self._app)
                 f.add_done_callback(err_details)
                 futures.append(f)
             wait(futures)