dongzhaorui 3 سال پیش
والد
کامیت
7165e6c10d
3فایلهای تغییر یافته به همراه150 افزوده شده و 131 حذف شده
  1. 32 25
      find_source/crawler/services/basics.py
  2. 16 17
      find_source/crawler/services/data_excavate.py
  3. 102 89
      find_source/crawler/services/data_query.py

+ 32 - 25
find_source/crawler/services/basics.py

@@ -9,10 +9,15 @@ from crawler.Task import Task
 from crawler.analysis import Parser
 from crawler.download import Downloader
 from crawler.schedule import Scheduler
-from crawler.utils import (
-    extract_domain,
-    is_url,
+from constants import (
+    ORGANIZATION,
+    KEYWORD,
+    SEED_URL,
+    COMPETING_GOODS,
+    VISIT_CLASSIFY,
+    QUERY_CLASSIFY
 )
+from crawler.utils import extract_domain, is_url
 from crawler.validate import Validator
 from settings import (
     MGO_LUA_SPIDERS,
@@ -37,36 +42,38 @@ class BasicSearch:
             scheduler=None,
             validator=None,
             downloader=None,
+            collector=None,
             parser=None,
             **kwargs
     ):
         self.scheduler = (scheduler or Scheduler())
         self.validator = (validator or Validator(redis_key='RemovalDuplicate_'))
+        self.collector = (collector or Validator(redis_key='CollectUrl_'))
         self.downloader = (downloader or Downloader())
         self.parser = (parser or Parser())
         # mongo查询
         self.query = {'enable_added': {'$exists': False}}
         self.projection = {'name': 1}
         self.sort = [('_id', -1)]
-        # 分类
-        self.visit_classify = 'visit'
-        self.query_classify = 'query'
         # 权重
         self.org_weight = org_weight
         self.url_weight = url_weight
         self.keyword_weight = keyword_weight
         self.retrieve_weight = 0
+        # 分类
+        self.visit_classify = VISIT_CLASSIFY
+        self.query_classify = QUERY_CLASSIFY
         # 归属组
-        self.org_groups = 'organization'
-        self.keyword_groups = 'keyword'
-        self.url_groups = 'seed_url'
-        self.competing_groups = 'competing_goods'
+        self.org_groups = ORGANIZATION
+        self.keyword_groups = KEYWORD
+        self.url_groups = SEED_URL
+        self.competing_groups = COMPETING_GOODS
 
     @staticmethod
     def loops_interval(interval):
         t_name = threading.currentThread().getName()
         next_run_time = delay_by((interval or 300))
-        logger.debug(f'线程运行结束:<{t_name}>,下次运行时间:{next_run_time}')
+        logger.debug(f'程运行结束:<{t_name}>,下次运行时间:{next_run_time}')
         time.sleep(interval)
 
     @staticmethod
@@ -107,12 +114,16 @@ class BasicSearch:
     def push_remove(self, task: Task):
         """数据去重的垃圾表"""
         logger.info(f"[上传去重特征]【{task['name']} - {task['url']}】")
-        self._push_data('remove', task, MGO_REMOVAL_DUPLICATE)
+        if not self.validator.data(task['url']):
+            self._push_data('remove', task, MGO_REMOVAL_DUPLICATE)
+            self.validator.add_data(task['url'])
 
     def push_domain(self, task: Task):
         """挖掘网站的查询结果"""
         logger.info(f"[推送挖掘结果]【{task['name']} - {task['domain']}】")
-        self._push_data('save', task, MGO_DOMAIN)
+        if not self.collector.data(task['domain']):
+            self._push_data('save', task, MGO_DOMAIN)
+            self.collector.add_data(task['domain'])
 
     def push_query(self, task: Task):
         """搜索组织单位查询结果"""
@@ -121,6 +132,8 @@ class BasicSearch:
 
     def push_records(self, task: Task):
         """挖掘数据的记录"""
+        if task['name'] > 20:
+            task['name'] = '{:.20s}'.format(task['name'])
         logger.info(f"[推送数据记录]【{task['name']} - {task['url']}】")
         self._push_data('records', task, MGO_RECORDS)
 
@@ -157,12 +170,11 @@ class BasicSearch:
         return competing_goods
 
     def lua_common_domains(self):
-        """从lua采集爬虫配置表获取网站名称与对应域名,同步到去重库"""
-        parm_commons = []
+        """从lua采集爬虫配置表获取网址域名,推送到数据源收录器"""
+        _c = 0
         projection = {'param_common': 1}
         cursor = MGO_LUA_SPIDERS.find(projection=projection)
         for item in cursor.sort(self.sort):
-            # name = item['param_common'][1]
             try:
                 url = item['param_common'][11]
                 if not is_url(url):
@@ -170,12 +182,7 @@ class BasicSearch:
                 domain = extract_domain(url)
             except IndexError:
                 continue
-            if not self.validator.data(domain):
-                parm_commons.append({
-                    # 'name': name,
-                    'domain': domain,
-                    'origin': url,
-                    'create_at': int2long(int(time.time()))
-                })
-                self.validator.add_data(domain)
-        return parm_commons
+            if not self.collector.data(domain):
+                self.collector.add_data(domain)
+                _c += 1
+        return _c

+ 16 - 17
find_source/crawler/services/data_excavate.py

@@ -14,10 +14,10 @@ from crawler.utils import (
 
 class DataExcavate(BasicSearch):
 
-    def __init__(self, excavate_workers=1, loop_excavate_interval=60, **kwargs):
+    def __init__(self, workers=1, loop_interval=60, **kwargs):
         super(DataExcavate, self).__init__(**kwargs)
-        self._interval = loop_excavate_interval
-        self._workers = excavate_workers
+        self._interval = loop_interval
+        self._workers = workers
 
     def retrieve_site(self, task: Task):
         logger.info(f'[数据挖掘]开始请求 - {task["url"]}')
@@ -25,8 +25,8 @@ class DataExcavate(BasicSearch):
         task['status_code'] = response.status_code
         if response.status_code != 200 or response.text in ['', None]:
             task['err_reason'] = response.reason
-            logger.error(f'[数据挖掘]异常网址 - {task["url"]}')
-            return
+            logger.error(f'[数据挖掘]异常网址 - {task["url"]} : {response.reason}')
+            return False
 
         task['domain'] = extract_domain(task['url'])
         page_source = response.text
@@ -35,10 +35,10 @@ class DataExcavate(BasicSearch):
 
         items = self.parser.site_items(page_source, task['base_url'])
         lst = []
-        _c = 0  # 页面包含的关键词计数器
+        _c = 0  # 过滤词计数器
         for item in items:
             name, url = item['name'], item['host']
-            if self.validator.phrase(name):
+            if self.validator.words(name):
                 lst.append(self.make_task(
                     url=url,
                     name=name,
@@ -52,9 +52,9 @@ class DataExcavate(BasicSearch):
         if _c > 1:
             self.push_domain(task)
         else:
-            if not self.validator.data(task['domain']):
-                self.push_remove(task)
+            self.push_remove(task)
         self.scheduler.add_excavate(lst, level=task['weight'])
+        return True
 
     def excavate(self):
         t_name = threading.currentThread().getName()
@@ -68,14 +68,13 @@ class DataExcavate(BasicSearch):
             task_key, task = tasks
             if self.validator.data(task['url']):
                 continue
-
-            self.retrieve_site(task)
-            '''url - 添加过滤器'''
-            self.validator.add_data(task['url'])
-            '''domain - 添加过滤器'''
-            self.validator.add_data(task['domain'])
-            '''访问记录'''
-            self.push_records(task)
+            '''挖掘站点'''
+            success = self.retrieve_site(task)
+            if not success:
+                '''url - 添加过滤器'''
+                self.validator.add_data(task['url'])
+            # '''挖掘记录'''
+            # self.push_records(task)
 
     def start(self):
         logger.info(f'[数据挖掘]初始化加载')

+ 102 - 89
find_source/crawler/services/data_query.py

@@ -3,7 +3,7 @@ from concurrent.futures import ThreadPoolExecutor, wait
 
 from common.execptions import HostsRetrieveError
 from common.log import logger
-from crawler.engines import JySearchEngine
+from crawler.engines import BingSearchEngine, QccSearchEngine
 from crawler.services.basics import BasicSearch
 from crawler.utils import (
     extract_base_url,
@@ -13,113 +13,126 @@ from crawler.utils import (
 )
 
 
-class DataQuery(BasicSearch):
+class QueryKeyWord(BasicSearch):
 
     def __init__(
             self,
-            engines=None,
+            engine=None,
+            query_workers=1,
             max_query_page=1,
             loop_query_interval=60,
             **kwargs
     ):
-        super(DataQuery, self).__init__(**kwargs)
+        super(QueryKeyWord, self).__init__(**kwargs)
+        self.engine = (engine or BingSearchEngine())
+        self._name = engine.__class__.__name__
+        self._workers = query_workers
         self._max_pages = max_query_page
         self._interval = loop_query_interval
-        self._engines = []
-        self.set_engines(engines)
 
-    def _init(self):
-        self.set_engines(self._engines)
-
-    def _set_engine(self, engine):
-        if isinstance(engine, JySearchEngine):
-            self._engines.append(engine)
-            logger.info(f'[数据查询]添加引擎 - <{engine.__class__.__name__}>')
-        return self
-
-    def set_engines(self, engines):
-        if isinstance(engines, list):
-            for engine in engines:
-                self._set_engine(engine)
-        else:
-            self._set_engine(engines)
-        return self
-
-    def search(self, engine):
-        ename = engine.__class__.__name__
-        threading.currentThread().setName(ename)
-        logger.info(f'[数据查询]启动引擎 - <{ename}>')
+    def query_keyword(self):
+        t_name = threading.currentThread().getName()
+        logger.info(f'[关键词查询]启动 - <{t_name} - {self._name}>')
         while True:
-            tasks = self.scheduler.get_query_task()
+            tasks = self.scheduler.get_query_task(self.keyword_groups)
             if len(tasks) == 0:
                 self.loops_interval(self._interval)
                 continue
 
             task_key, task = tasks
-            word = task['search']
-            if task['groups'] == self.org_groups:
-                '''使用企查查服务检索site'''
-                logger.info(f"<QccSearch> {task['groups']} >>> {word}")
-                try:
-                    url = engine.by_org_get_site(word)
-                    task['url'] = url
-                    task['name'] = word
-                    task['domain'] = extract_domain(url)
-                    '''保存数据'''
-                    self.push_query(task)
-                    if not is_url(url):
+            logger.info(f"<{self._name} - {task['groups']}> >> {task['search']}")
+            cur_page = 0
+            while cur_page < self._max_pages:
+                cur_page += 1
+                '''检索页面元素生成数据挖掘任务'''
+                lst = []
+                urls = self.engine.search(task['search'], cur_page)
+                for url in urls:
+                    base_url = extract_base_url(url)
+                    if self.validator.data(base_url):
                         continue
+                    lst.append(self.make_task(
+                        url=base_url,
+                        origin=task['origin'],
+                        groups=task['groups'],
+                        classify=self.visit_classify,
+                        weight=task['weight'],
+                    ))
+                '''推送数据挖掘队列'''
+                self.scheduler.add_excavate(lst, level=task['weight'])
+                logger.info(f'<{self._name}> {task["search"]}-第{cur_page}页-共{len(lst)}条')
+            # '''查询记录'''
+            # self.push_records(task)
 
-                    if self.validator.data(task['domain']):
-                        continue
+    def start(self):
+        logger.info(f'[关键词查询]初始化加载')
+        with ThreadPoolExecutor(self._workers, 'QueryKeyWord') as executor:
+            futures = []
+            for _ in range(1, self._workers + 1):
+                f = executor.submit(self.query_keyword)
+                f.add_done_callback(err_details)
+                futures.append(f)
+            wait(futures)
 
-                    '''domain - 添加过滤器'''
-                    self.validator.add_data(task['domain'])
-                    '''推送数据挖掘队列'''
-                    task['classify'] = self.visit_classify
-                    self.scheduler.add_excavate(task, level=task['weight'])
-                except HostsRetrieveError as e:
-                    task['status_code'] = e.code
-                    task['err_reason'] = e.reason
-                    logger.exception(e)
-                    '''重新放回查询队列'''
-                    self.scheduler.add_query(task, level=task['weight'])
-            else:
-                '''使用搜索引擎查询关键词'''
-                logger.info(f"<{ename}> {task['groups']} >>> {word}")
-                cur_page = 0
-                while cur_page < self._max_pages:
-                    cur_page += 1
-                    '''检索文本'''
-                    lst = []
-                    urls = engine.search(word, cur_page)
-                    '''生成数据挖掘任务'''
-                    for url in urls:
-                        domain = extract_domain(url)
-                        if self.validator.data(domain):
-                            continue
-                        lst.append(self.make_task(
-                            url=extract_base_url(url),
-                            origin=task['origin'],
-                            groups=task['groups'],
-                            classify=self.visit_classify,
-                            weight=task['weight'],
-                        ))
-                    '''推送数据挖掘队列'''
-                    self.scheduler.add_excavate(lst, level=task['weight'])
-                    logger.info(f'<{ename}> {word}-第{cur_page}页-共{len(lst)}条')
 
-            '''数据记录'''
-            self.push_records(task)
+class QueryOrganization(BasicSearch):
+
+    def __init__(
+            self,
+            engine=None,
+            query_workers=1,
+            loop_query_interval=60,
+            **kwargs
+    ):
+        super(QueryOrganization, self).__init__(**kwargs)
+        self.engine = (engine or QccSearchEngine())
+        self._name = engine.__class__.__name__
+        self._workers = query_workers
+        self._interval = loop_query_interval
+
+    def query_org(self):
+        t_name = threading.currentThread().getName()
+        logger.info(f'[单位组织查询]启动 - <{t_name} - {self._name}>')
+        while True:
+            tasks = self.scheduler.get_query_task(self.org_groups)
+            if len(tasks) == 0:
+                self.loops_interval(self._interval)
+                continue
+
+            task_key, task = tasks
+            word = task['search']
+            logger.info(f"<{self._name} - {task['groups']}> >> {word}")
+            try:
+                url = self.engine.search(word)
+                task['url'] = url
+                task['name'] = word
+                task['domain'] = extract_domain(url)
+                '''保存数据'''
+                self.push_query(task)
+                if not is_url(url):
+                    continue
+                '''此处通过收录器判断是否是已收录网站,再决定是否推送数据挖掘队列'''
+                if self.collector.data(task['domain']):
+                    continue
+                '''设置任务为数据挖掘类型'''
+                task['classify'] = self.visit_classify
+                '''推送数据挖掘队列'''
+                self.scheduler.add_excavate(task, level=task['weight'])
+            except HostsRetrieveError as e:
+                task['status_code'] = e.code
+                task['err_reason'] = e.reason
+                logger.exception(e)
+                '''重新放回查询队列'''
+                self.scheduler.add_query(self.org_groups, task, level=task['weight'])
+            # '''查询记录'''
+            # self.push_records(task)
 
     def start(self):
-        if len(self._engines) > 0:
-            logger.info(f'[数据查询]初始化加载')
-            max_workers = len(self._engines)  # 根据搜索引擎设置最大线程池
-            with ThreadPoolExecutor(max_workers, 'DataQuery') as executor:
-                futures = []
-                for engine in self._engines:
-                    f = executor.submit(self.search, engine)
-                    f.add_done_callback(err_details)
-                    futures.append(f)
-                wait(futures)
+        logger.info(f'[单位组织查询]初始化加载')
+        with ThreadPoolExecutor(self._workers, 'QueryOrganization') as executor:
+            futures = []
+            for _ in range(1, self._workers + 1):
+                f = executor.submit(self.query_org)
+                f.add_done_callback(err_details)
+                futures.append(f)
+            wait(futures)