dongzhaorui %!s(int64=3) %!d(string=hai) anos
pai
achega
c69a43742d
Modificáronse 1 ficheiros con 171 adicións e 0 borrados
  1. 171 0
      find_source/crawler/spiders.py

+ 171 - 0
find_source/crawler/spiders.py

@@ -0,0 +1,171 @@
+import time
+from concurrent.futures import ThreadPoolExecutor, wait
+from typing import List
+
+from common.databases import insert_one
+from common.execptions import ValidatorError, HostsRetrieveError
+from common.log import logger
+from crawler.Task import Task
+from crawler.analysis import parse_urls
+from crawler.download import Downloader
+from crawler.retrieve import Validator
+from crawler.schedule import Scheduler
+from crawler.search_engine import JySearchEngine
+from crawler.utils import (
+    extract_base_url,
+    extract_page_title,
+    err_details,
+    extract_domain
+)
+from settings import (
+    MGO_REPETITION,
+    MGO_RECORDS,
+    SPECIAL_ENCODINGS,
+    MGO_VISIT_KEYWORDS,
+    MGO_VISIT_ORGANIZATION
+)
+
+
+class BasicScheduler:
+
+    def __init__(self, scheduler=None, **kwargs):
+        self._scheduler = (scheduler or Scheduler())
+
+
+class SearchEngine(BasicScheduler):
+
+    def __init__(self, **kwargs):
+        super(SearchEngine, self).__init__(scheduler=kwargs.get('scheduler'))
+        self._engines = []
+
+    def set_search_engine(self, engine=None):
+        if isinstance(engine, JySearchEngine):
+            self._engines.append(engine)
+            logger.info(f'[搜索引擎 - 添加成功]{engine.__class__.__name__}')
+        return self
+
+    def set_search_engines(self, engines):
+        for engine in engines:
+            self.set_search_engine(engine)
+        return self
+
+    def search_organizations(self, engine, items: List[dict]):
+        logger.info(f'[搜索组织]共{len(items)}个')
+        for item in items:
+            name = item.get('name')
+            if name in ['', None]:
+                logger.warning(f'[组织搜索 - 异常]{item}')
+                continue
+
+            word = str(name).replace(' ', '').strip()
+            logger.info(f"[搜索 - 组织]{engine.__class__.__name__} >>> {word}")
+            urls = engine.search(word)
+            lst = [Task(url=url, groups='organization') for url in urls]
+            self._scheduler.insert_tasks(lst)
+            MGO_VISIT_ORGANIZATION.update_one(
+                {'_id': item['_id']},
+                {'$set': {'enable_added': True}}
+            )
+
+    def search_words(self, engine, words):
+        logger.info(f'[搜索关键词]共{len(words)}个')
+        for word in words:
+            word = str(word).replace(' ', '').strip()
+            logger.info(f"[搜索 - 关键词]{engine.__class__.__name__} >>> {word}")
+            urls = engine.search(word)
+            lst = [Task(url=url, groups='keyword') for url in urls]
+            self._scheduler.insert_tasks(lst)
+
+    def search_engines(self):
+        with ThreadPoolExecutor(max_workers=2, thread_name_prefix='SearchEngine') as executor:
+            '''组织单位'''
+            search_organizations = []
+            projection = {'name': 1}
+            _q = {'enable_added': {'$exists': False}}
+            for item in MGO_VISIT_ORGANIZATION.find(_q, projection=projection):
+                search_organizations.append(item)
+            '''关键词'''
+            search_words = []
+            for item in MGO_VISIT_KEYWORDS.find(projection=projection):
+                search_words.append(item['name'])
+            futures = []
+            for engine in self._engines:
+                logger.info(f"[搜索引擎 - {engine.__class__.__name__}]启动成功")
+                f = executor.submit(self.search_words, engine, search_words)
+                f.add_done_callback(err_details)
+                f = executor.submit(self.search_organizations, engine, search_organizations)
+                f.add_done_callback(err_details)
+                futures.append(f)
+            wait(futures)
+
+
+class SearchDomain(BasicScheduler):
+
+    def __init__(
+            self,
+            downloader=None,
+            parser=None,
+            validator=None,
+            allow_load_filter=False,
+            **kwargs,
+    ):
+        super(SearchDomain, self).__init__(scheduler=kwargs.get('scheduler'))
+        self._downloader = (downloader or Downloader())
+        self._parser = (parser or parse_urls)
+        self._validator = (validator or Validator())
+        if allow_load_filter:
+            self._validator.load_filter()
+
+    def verify(self, task):
+        valid_words = self._validator.words(task['name'], task)
+        if all([valid_words]):
+            # 需求站点
+            insert_one(MGO_RECORDS, task)
+            # 加入过滤器
+            self._validator.add_filter_feature(task['domain'])
+            # 加入去重库
+            duplicate_site = {'url': task['domain'], 'time': int(time.time())}
+            insert_one(MGO_REPETITION, duplicate_site)
+            logger.info(f"[检索成功]{task['domain']} - {task['name']}")
+        else:
+            if any([task['sensitive'], task['duplication']]):
+                raise ValidatorError(f"特征检验未通过:{task['name']}")
+
+    def crawl_spider(self):
+        while True:
+            tasks = self._scheduler.get_task()
+            if len(tasks) == 0:
+                logger.info('关闭寻源爬虫')
+                break
+
+            task_key, task = tasks
+            groups = task['groups']
+            domain = extract_domain(task['url'])
+            allow_visit_domain = self._validator.url(domain)
+            if not allow_visit_domain:
+                continue
+
+            logger.info(f'request web site -> {task["url"]}')
+            response = self._downloader.get(task['url'])
+            print(response, len(response.text))
+            if response.status_code != 200 or response.text in ['', None]:
+                continue
+
+            response.encoding = response.apparent_encoding
+            if response.encoding in SPECIAL_ENCODINGS:
+                response.encoding = 'utf-8'
+
+            task['domain'] = domain
+            base_url = extract_base_url(task['url'])
+            task['base_url'] = base_url
+            page_source = response.text
+            title = extract_page_title(page_source)
+            print(title)
+            task['name'] = title
+            try:
+                self.verify(task)
+                urls = self._parser(page_source, base_url)
+                new_tasks = [Task(url=url, groups=groups) for url in urls]
+                self._scheduler.insert_tasks(new_tasks)
+            except HostsRetrieveError:
+                pass