Pārlūkot izejas kodu

新增 - 数据定时同步机制

dongzhaorui 3 gadi atpakaļ
vecāks
revīzija
ee9da7ef02
1 mainītis faili ar 241 papildinājumiem un 102 dzēšanām
  1. 241 102
      find_source/crawler/spiders.py

+ 241 - 102
find_source/crawler/spiders.py

@@ -1,10 +1,11 @@
+import threading
 import time
-from concurrent.futures import ThreadPoolExecutor, wait
-from typing import List
+from typing import List, Mapping
 
-from common.databases import insert_one
+from common.databases import insert_one, int2long
 from common.execptions import ValidatorError, HostsRetrieveError
 from common.log import logger
+from common.tools import delay_by
 from crawler.Task import Task
 from crawler.analysis import parse_urls
 from crawler.download import Downloader
@@ -14,25 +15,169 @@ from crawler.search_engine import JySearchEngine
 from crawler.utils import (
     extract_base_url,
     extract_page_title,
-    err_details,
-    extract_domain
+    extract_domain,
+    valid_url
 )
 from settings import (
-    MGO_REPETITION,
-    MGO_RECORDS,
     SPECIAL_ENCODINGS,
-    MGO_VISIT_KEYWORDS,
-    MGO_VISIT_ORGANIZATION
+    MGO_REPETITION,
+    MGO_DOMAIN,
+    MGO_SEED_URLS,
+    MGO_SEED_ORGS,
+    MGO_SEED_KEYWORDS,
 )
 
 
-class BasicScheduler:
+class BasicSearch:
 
     def __init__(self, scheduler=None, **kwargs):
         self._scheduler = (scheduler or Scheduler())
+        self.query = {'enable_added': {'$exists': False}}
+        self.projection = {'name': 1}
+        self.sort = [('_id', -1)]
+        # 权重
+        self.org_weight = kwargs.get('org_weight', 9)
+        self.url_weight = kwargs.get('url_weight', 8)
+        self.keyword_weight = kwargs.get('keyword_weight', 5)
+        # 分类
+        self.visit_classify = 'visit'
+        self.query_classify = 'query'
+        # 归属组
+        self.org_groups = 'organization'
+        self.keyword_groups = 'keyword'
+        self.url_groups = 'seed_url'
+        self._init()
+
+    def _init(self):
+        self.sync_data()
+
+    @staticmethod
+    def loops_interval(label, interval):
+        next_run_time = delay_by((interval or 300))
+        logger.info(f'执行:<{label}>,下次运行时间:{next_run_time}')
+        time.sleep(interval)
+
+    @staticmethod
+    def make_task(**kwargs):
+        """生成Task对象"""
+        return Task(**kwargs)
+
+    def seed_orgs(self) -> List[Mapping]:
+        """组织|单位"""
+        search_orgs = []
+        cursor = MGO_SEED_ORGS.find(self.query, projection=self.projection)
+        for item in cursor.sort(self.sort):
+            search_orgs.append(item)
+        return search_orgs
+
+    def seed_keywords(self):
+        """关键词"""
+        search_keywords = []
+        cursor = MGO_SEED_KEYWORDS.find(projection=self.projection)
+        for item in cursor.sort(self.sort):
+            search_keywords.append(item['name'])
+        return search_keywords
+
+    def seed_urls(self) -> List[Mapping]:
+        """种子urls"""
+        search_urls = []
+        cursor = MGO_SEED_URLS.find(self.query, projection=self.projection)
+        for item in cursor.sort(self.sort):
+            search_urls.append(item)
+        return search_urls
+
+    def sync_data_urls(self):
+        """同步网址数据"""
+        _interval = 7200
+        while True:
+            items = self.seed_urls()
+            lst = []
+            for item in items:
+                if not valid_url(item['name']):
+                    items.remove(item)
+                    continue
+                lst.append(self.make_task(
+                    url=item['name'],
+                    groups=self.url_groups,
+                    classify=self.visit_classify,
+                    weight=self.url_weight))
+            self._scheduler.insert_tasks(lst, level=self.url_weight)
+
+            for item in items:
+                MGO_SEED_URLS.update_many(
+                    {'_id': item['_id']},
+                    {'$set': {'enable_added': True}}
+                )
+            logger.info(f'同步更新{len(items)}条网址数据')
+            self.loops_interval(self.sync_data_urls.__name__, _interval)
+
+    def sync_data_keywords(self):
+        """同步关键词数据"""
+        _interval = 1800
+        while True:
+            words = self.seed_keywords()
+            # 处理关键词格式并推送到任务队列
+            words = [str(word).replace('&nbsp;', '').strip() for word in words]
+            lst = []
+            for word in words:
+                lst.append(self.make_task(
+                    search=word,
+                    groups=self.keyword_groups,
+                    classify=self.query_classify,
+                    weight=self.keyword_weight))
+            self._scheduler.insert_tasks(lst, level=self.keyword_weight)
+            logger.info(f'同步更新{len(words)}条关键词数据')
+            self.loops_interval(self.sync_data_keywords.__name__, _interval)
 
+    def sync_data_orgs(self):
+        """同步组织单位数据"""
+        _interval = 3600
+        while True:
+            items = self.seed_orgs()
+            # 处理单位组织名称并推送到任务队列
+            orgs = []
+            for item in items:
+                name = item.get('name')
+                if name in ['', None]:
+                    logger.warning(f'[异常的单位组织]{item}')
+                    continue
+                word = str(name).replace('&nbsp;', '').strip()
+                orgs.append(word)
+            lst = []
+            for word in orgs:
+                lst.append(self.make_task(
+                    search=word,
+                    groups=self.org_groups,
+                    classify=self.query_classify,
+                    weight=self.org_weight))
+            self._scheduler.insert_tasks(lst, level=self.org_weight)
+            # 已添加的组织单位名称进行标记,之后不在推送到任务队列
+            for item in items:
+                MGO_SEED_ORGS.update_one(
+                    {'_id': item['_id']},
+                    {'$set': {'enable_added': True}}
+                )
+            logger.info(f'同步更新{len(items)}个单位组织数据')
+            self.loops_interval(self.sync_data_orgs.__name__, _interval)
+
+    def sync_data(self):
+        """同步数据"""
+        logger.info(f'[数据寻源]开启数据同步')
+        threading.Thread(
+            target=self.sync_data_urls,
+            name='LoadingSeedUrls'
+        ).start()
+        threading.Thread(
+            target=self.sync_data_keywords,
+            name='LoadingSeedKeyWords'
+        ).start()
+        threading.Thread(
+            target=self.sync_data_orgs,
+            name='LoadingSeedOrgs'
+        ).start()
 
-class SearchEngine(BasicScheduler):
+
+class SearchEngine(BasicSearch):
 
     def __init__(self, **kwargs):
         super(SearchEngine, self).__init__(scheduler=kwargs.get('scheduler'))
@@ -41,7 +186,7 @@ class SearchEngine(BasicScheduler):
     def set_search_engine(self, engine=None):
         if isinstance(engine, JySearchEngine):
             self._engines.append(engine)
-            logger.info(f'[搜索引擎 - 添加成功]{engine.__class__.__name__}')
+            logger.info(f'添加搜索引擎<{engine.__class__.__name__}>完成')
         return self
 
     def set_search_engines(self, engines):
@@ -49,57 +194,41 @@ class SearchEngine(BasicScheduler):
             self.set_search_engine(engine)
         return self
 
-    def search_organizations(self, engine, items: List[dict]):
-        logger.info(f'[搜索组织]共{len(items)}个')
-        for item in items:
-            name = item.get('name')
-            if name in ['', None]:
-                logger.warning(f'[组织搜索 - 异常]{item}')
-                continue
-
-            word = str(name).replace('&nbsp;', '').strip()
-            logger.info(f"[搜索 - 组织]{engine.__class__.__name__} >>> {word}")
-            urls = engine.search(word)
-            lst = [Task(url=url, groups='organization') for url in urls]
-            self._scheduler.insert_tasks(lst)
-            MGO_VISIT_ORGANIZATION.update_one(
-                {'_id': item['_id']},
-                {'$set': {'enable_added': True}}
-            )
-
-    def search_words(self, engine, words):
-        logger.info(f'[搜索关键词]共{len(words)}个')
-        for word in words:
-            word = str(word).replace('&nbsp;', '').strip()
-            logger.info(f"[搜索 - 关键词]{engine.__class__.__name__} >>> {word}")
-            urls = engine.search(word)
-            lst = [Task(url=url, groups='keyword') for url in urls]
-            self._scheduler.insert_tasks(lst)
+    def start_search(self, engine):
+        while True:
+            tasks = self._scheduler.get_task()
+            if len(tasks) == 0:
+                self.loops_interval(self.start_search.__name__, 5)
+
+            task_key, task = tasks
+            task['update_at'] = int2long(int(time.time()))
+            if task['classify'] == self.visit_classify:
+                self._scheduler.insert_task(task, level=task['weight'])
+            else:
+                word = task['search']
+                logger.info(f"<{engine.__class__.__name__}> {task['groups']} >>> {word}")
+                urls = engine.search(word)
+                lst = []
+                for url in urls:
+                    lst.append(self.make_task(
+                        url=url,
+                        groups=task['groups'],
+                        classify=self.visit_classify,
+                        weight=self.url_weight
+                    ))
+                self._scheduler.insert_tasks(lst, level=self.url_weight)
 
     def search_engines(self):
-        with ThreadPoolExecutor(max_workers=2, thread_name_prefix='SearchEngine') as executor:
-            '''组织单位'''
-            search_organizations = []
-            projection = {'name': 1}
-            _q = {'enable_added': {'$exists': False}}
-            for item in MGO_VISIT_ORGANIZATION.find(_q, projection=projection):
-                search_organizations.append(item)
-            '''关键词'''
-            search_words = []
-            for item in MGO_VISIT_KEYWORDS.find(projection=projection):
-                search_words.append(item['name'])
-            futures = []
-            for engine in self._engines:
-                logger.info(f"[搜索引擎 - {engine.__class__.__name__}]启动成功")
-                f = executor.submit(self.search_words, engine, search_words)
-                f.add_done_callback(err_details)
-                f = executor.submit(self.search_organizations, engine, search_organizations)
-                f.add_done_callback(err_details)
-                futures.append(f)
-            wait(futures)
-
-
-class SearchDomain(BasicScheduler):
+        logger.info(f'[搜索引擎]初始化加载')
+        for engine in self._engines:
+            threading.Thread(
+                target=self.start_search,
+                name='SearchEngine',
+                args=(engine,)
+            ).start()
+
+
+class VisitDomain(BasicSearch):
 
     def __init__(
             self,
@@ -109,29 +238,32 @@ class SearchDomain(BasicScheduler):
             allow_load_filter=False,
             **kwargs,
     ):
-        super(SearchDomain, self).__init__(scheduler=kwargs.get('scheduler'))
+        super(VisitDomain, self).__init__(scheduler=kwargs.get('scheduler'))
         self._downloader = (downloader or Downloader())
         self._parser = (parser or parse_urls)
         self._validator = (validator or Validator())
         if allow_load_filter:
             self._validator.load_filter()
 
-    def verify(self, task):
+    def push_new_domain(self, task: Task):
+        # 新源
+        insert_one(MGO_DOMAIN, task)
+        # 加入过滤器
+        self._validator.add_filter_feature(task['domain'])
+        # 加入去重库
+        remove_duplicate = {'url': task['domain'], 'time': task['update_at']}
+        insert_one(MGO_REPETITION, remove_duplicate)
+        logger.info(f"[录入新域]{task['domain']} - {task['name']}")
+
+    def verify(self, task: Task):
         valid_words = self._validator.words(task['name'], task)
         if all([valid_words]):
-            # 需求站点
-            insert_one(MGO_RECORDS, task)
-            # 加入过滤器
-            self._validator.add_filter_feature(task['domain'])
-            # 加入去重库
-            duplicate_site = {'url': task['domain'], 'time': int(time.time())}
-            insert_one(MGO_REPETITION, duplicate_site)
-            logger.info(f"[检索成功]{task['domain']} - {task['name']}")
+            self.push_new_domain(task)
         else:
             if any([task['sensitive'], task['duplication']]):
                 raise ValidatorError(f"特征检验未通过:{task['name']}")
 
-    def crawl_spider(self):
+    def search_domains(self):
         while True:
             tasks = self._scheduler.get_task()
             if len(tasks) == 0:
@@ -139,33 +271,40 @@ class SearchDomain(BasicScheduler):
                 break
 
             task_key, task = tasks
-            groups = task['groups']
-            domain = extract_domain(task['url'])
-            allow_visit_domain = self._validator.url(domain)
-            if not allow_visit_domain:
-                continue
-
-            logger.info(f'request web site -> {task["url"]}')
-            response = self._downloader.get(task['url'])
-            print(response, len(response.text))
-            if response.status_code != 200 or response.text in ['', None]:
-                continue
-
-            response.encoding = response.apparent_encoding
-            if response.encoding in SPECIAL_ENCODINGS:
-                response.encoding = 'utf-8'
-
-            task['domain'] = domain
-            base_url = extract_base_url(task['url'])
-            task['base_url'] = base_url
-            page_source = response.text
-            title = extract_page_title(page_source)
-            print(title)
-            task['name'] = title
-            try:
-                self.verify(task)
-                urls = self._parser(page_source, base_url)
-                new_tasks = [Task(url=url, groups=groups) for url in urls]
-                self._scheduler.insert_tasks(new_tasks)
-            except HostsRetrieveError:
-                pass
+            task['update_at'] = int2long(int(time.time()))
+            if task['classify'] != self.visit_classify:
+                self._scheduler.insert_task(task, level=task['weight'])
+            else:
+                domain = extract_domain(task['url'])
+                allow_visit_domain = self._validator.url(domain)
+                if not allow_visit_domain:
+                    continue
+
+                logger.info(f'request web site -> {task["url"]}')
+                response = self._downloader.get(task['url'])
+                if response.status_code != 200 or response.text in ['', None]:
+                    continue
+
+                response.encoding = response.apparent_encoding
+                if response.encoding in SPECIAL_ENCODINGS:
+                    response.encoding = 'utf-8'
+                task['domain'] = domain
+                base_url = extract_base_url(task['url'])
+                task['base_url'] = base_url
+                page_source = response.text
+                title = extract_page_title(page_source)
+                task['name'] = title
+                try:
+                    self.verify(task)
+                    urls = self._parser(page_source, base_url)
+                    new_tasks = []
+                    for url in urls:
+                        new_tasks.append(self.make_task(
+                            url=url,
+                            groups=task['groups'],
+                            classify=self.visit_classify,
+                            weight=task['weight']
+                        ))
+                    self._scheduler.insert_tasks(new_tasks, level=self.url_weight)
+                except HostsRetrieveError:
+                    pass