dongzhaorui 3 vuotta sitten
vanhempi
commit
d06cce616c
1 muutettua tiedostoa jossa 7 lisäystä ja 118 poistoa
  1. 7 118
      find_source/crawler/__init__.py

+ 7 - 118
find_source/crawler/__init__.py

@@ -1,131 +1,20 @@
-import time
 from concurrent.futures import ThreadPoolExecutor, wait
 
-from common.databases import insert_one
-from common.execptions import ValidatorError, HostsRetrieveError
-from common.log import logger
-from crawler.Task import Task
-from crawler.analysis import parse_urls
-from crawler.download import Downloader
-from crawler.retrieve import Validator
-from crawler.schedule import Scheduler
-from crawler.search_engine import JySearchEngine
-from crawler.utils import (
-    extract_base_url,
-    extract_page_title,
-    err_details,
-    extract_domain
-)
-from settings import MGO_REPETITION, MGO_RECORDS, MGO_VISIT
+from crawler.spiders import SearchEngine, SearchDomain
+from crawler.utils import err_details
 
 
-class BreadthCrawler:
+class BreadthCrawler(SearchEngine, SearchDomain):
 
-    def __init__(
-            self,
-            downloader=None,
-            parser=None,
-            scheduler=None,
-            validator=None,
-            workers=1,
-            allow_load_filter=False
-    ):
+    def __init__(self, workers=1, **kwargs):
+        SearchEngine.__init__(self, **kwargs)
+        SearchDomain.__init__(self, **kwargs)
         self._workers = workers
-        self._scheduler = (scheduler or Scheduler())
-        self._downloader = (downloader or Downloader())
-        self._parser = (parser or parse_urls)
-        self._validator = (validator or Validator())
-        self._engines = []
-        if allow_load_filter:
-            self._validator.load_filter()
-
-    def verify(self, task):
-        valid_words = self._validator.words(task['name'], task)
-        if all([valid_words]):
-            # 需求站点
-            insert_one(MGO_RECORDS, task)
-            # 加入过滤器
-            self._validator.add_filter_feature(task['domain'])
-            # 加入去重库
-            duplicate_site = {'url': task['domain'], 'time': int(time.time())}
-            insert_one(MGO_REPETITION, duplicate_site)
-            logger.info(f"[获取新源]{task['domain']} - {task['name']}")
-        else:
-            if any([task['sensitive'], task['duplication']]):
-                raise ValidatorError(f"特征检验未通过:{task['name']}")
-
-    def crawl_spider(self):
-        while True:
-            tasks = self._scheduler.get_task()
-            if len(tasks) == 0:
-                print("关闭寻源爬虫")
-                break
-
-            task_key, task = tasks
-            groups = task['groups']
-            domain = extract_domain(task['url'])
-            allow_visit_domain = self._validator.url(domain)
-            if not allow_visit_domain:
-                continue
-
-            logger.info(f'request web site -> {task["url"]}')
-            response = self._downloader.get(task['url'])
-            print(response, len(response.text))
-            if response.status_code != 200 or response.text in ['']:
-                continue
-
-            task['domain'] = domain
-            base_url = extract_base_url(task['url'])
-            task['base_url'] = base_url
-            page_source = response.text
-            title = extract_page_title(page_source)
-            print(title)
-            task['name'] = title
-            try:
-                self.verify(task)
-                urls = self._parser(page_source, base_url)
-                new_tasks = [Task(url=url, groups=groups) for url in urls]
-                self._scheduler.insert_tasks(new_tasks)
-            except HostsRetrieveError:
-                pass
-
-    def set_search_engine(self, engine=None):
-        if isinstance(engine, JySearchEngine):
-            self._engines.append(engine)
-            logger.info(f'[搜索引擎 - {engine.__class__.__name__}]添加成功')
-        return self
-
-    def set_search_engines(self, engines):
-        for engine in engines:
-            self.set_search_engine(engine)
-        return self
-
-    def search_words(self, engine, words):
-        for word in words:
-            word = str(word).replace(' ', '').strip()
-            logger.info(f"[{engine.__class__.__name__} - 搜索]{word}")
-            urls = engine.search(word)
-            lst = [Task(url=url, groups='keywords') for url in urls]
-            self._scheduler.insert_tasks(lst)
-
-    def enable_search_engines(self):
-        with ThreadPoolExecutor(max_workers=2, thread_name_prefix='SearchEngine') as executor:
-            search_words = []
-            for item in MGO_VISIT.find():
-                search_words.append(item['name'])
-
-            futures = []
-            for engine in self._engines:
-                logger.info(f"[搜索引擎 - {engine.__class__.__name__}]启动成功")
-                f = executor.submit(self.search_words, engine, search_words)
-                f.add_done_callback(err_details)
-                futures.append(f)
-            wait(futures)
 
     def start(self):
         with ThreadPoolExecutor(max_workers=self._workers) as executor:
             futures = []
-            f = executor.submit(self.enable_search_engines)
+            f = executor.submit(self.search_engines)
             f.add_done_callback(err_details)
             futures.append(f)
             for _ in range(1, self._workers + 1):