فهرست منبع

调整多线程数据同步为单线程数据同步

dongzhaorui 3 سال پیش
والد
کامیت
75446e7dfa
1فایلهای تغییر یافته به همراه205 افزوده شده و 135 حذف شده
  1. 205 135
      find_source/crawler/spiders.py

+ 205 - 135
find_source/crawler/spiders.py

@@ -1,5 +1,6 @@
 import threading
 import time
+from concurrent.futures import ThreadPoolExecutor, wait
 from typing import List, Mapping
 
 from common.databases import insert_one, int2long
@@ -16,40 +17,54 @@ from crawler.utils import (
     extract_base_url,
     extract_page_title,
     extract_domain,
-    valid_url
+    is_url,
+    err_details
 )
 from settings import (
+    REDIS_KEY,
     SPECIAL_ENCODINGS,
-    MGO_REPETITION,
+    MGO_LUA_SPIDERS,
     MGO_DOMAIN,
-    MGO_SEED_URLS,
-    MGO_SEED_ORGS,
-    MGO_SEED_KEYWORDS,
+    MGO_URLS,
+    MGO_ORGS,
+    MGO_KEYWORDS,
+    MGO_REMOVAL_DUPLICATE
 )
 
 
 class BasicSearch:
 
-    def __init__(self, scheduler=None, **kwargs):
-        self._scheduler = (scheduler or Scheduler())
+    def __init__(
+            self,
+            scheduler=None,
+            validator=None,
+            downloader=None,
+            parser=None,
+            org_weight=9,
+            url_weight=8,
+            keyword_weight=7,
+            **kwargs
+    ):
+        self.scheduler = (scheduler or Scheduler())
+        self.validator = (validator or Validator())
+        self.downloader = (downloader or Downloader())
+        self.parser = (parser or parse_urls)
+        # mongo查询
         self.query = {'enable_added': {'$exists': False}}
         self.projection = {'name': 1}
         self.sort = [('_id', -1)]
-        # 权重
-        self.org_weight = kwargs.get('org_weight', 9)
-        self.url_weight = kwargs.get('url_weight', 8)
-        self.keyword_weight = kwargs.get('keyword_weight', 5)
         # 分类
         self.visit_classify = 'visit'
         self.query_classify = 'query'
+        # 权重
+        self.org_weight = org_weight
+        self.url_weight = url_weight
+        self.keyword_weight = keyword_weight
         # 归属组
         self.org_groups = 'organization'
         self.keyword_groups = 'keyword'
         self.url_groups = 'seed_url'
-        self._init()
-
-    def _init(self):
-        self.sync_data()
+        self.competing_groups = 'competing_goods'
 
     @staticmethod
     def loops_interval(label, interval):
@@ -65,7 +80,7 @@ class BasicSearch:
     def seed_orgs(self) -> List[Mapping]:
         """组织|单位"""
         search_orgs = []
-        cursor = MGO_SEED_ORGS.find(self.query, projection=self.projection)
+        cursor = MGO_ORGS.find(self.query, projection=self.projection)
         for item in cursor.sort(self.sort):
             search_orgs.append(item)
         return search_orgs
@@ -73,7 +88,7 @@ class BasicSearch:
     def seed_keywords(self):
         """关键词"""
         search_keywords = []
-        cursor = MGO_SEED_KEYWORDS.find(projection=self.projection)
+        cursor = MGO_KEYWORDS.find(projection=self.projection)
         for item in cursor.sort(self.sort):
             search_keywords.append(item['name'])
         return search_keywords
@@ -81,106 +96,164 @@ class BasicSearch:
     def seed_urls(self) -> List[Mapping]:
         """种子urls"""
         search_urls = []
-        cursor = MGO_SEED_URLS.find(self.query, projection=self.projection)
+        cursor = MGO_URLS.find(self.query, projection=self.projection)
         for item in cursor.sort(self.sort):
             search_urls.append(item)
         return search_urls
 
+    def lua_common_domains(self):
+        """从lua采集爬虫配置表获取网站名称与对应域名,同步到去重库"""
+        parm_commons = []
+        projection = {'param_common': 1}
+        cursor = MGO_LUA_SPIDERS.find(projection=projection)
+        for item in cursor.sort(self.sort):
+            name = item['param_common'][1]
+            try:
+                url = item['param_common'][11]
+                if not is_url(url):
+                    continue
+                domain = extract_domain(url)
+            except IndexError:
+                continue
+            exists_domain = self.validator.url(domain)
+            if not exists_domain:
+                parm_commons.append({'name': name, 'domain': domain})
+                self.validator.add_url(domain)
+        return parm_commons
+
+
+class SyncData(BasicSearch):
+
+    def __init__(self, allow_load_filter=False, **kwargs):
+        super(SyncData, self).__init__(**kwargs)
+        self._init(allow_load_filter)
+
+    def _init(self, allow_load_filter=False):
+        threading.Thread(
+            target=self.sync_data,
+            name='SyncData'
+        ).start()
+        if allow_load_filter:
+            self.validator.load_filter()
+
     def sync_data_urls(self):
         """同步网址数据"""
-        _interval = 7200
-        while True:
-            items = self.seed_urls()
-            lst = []
-            for item in items:
-                if not valid_url(item['name']):
-                    items.remove(item)
-                    continue
-                lst.append(self.make_task(
-                    url=item['name'],
-                    groups=self.url_groups,
-                    classify=self.visit_classify,
-                    weight=self.url_weight))
-            self._scheduler.insert_tasks(lst, level=self.url_weight)
-
-            for item in items:
-                MGO_SEED_URLS.update_many(
-                    {'_id': item['_id']},
-                    {'$set': {'enable_added': True}}
-                )
-            logger.info(f'同步更新{len(items)}条网址数据')
-            self.loops_interval(self.sync_data_urls.__name__, _interval)
+        # _interval = 7200
+        # while True:
+        items = self.seed_urls()
+        lst = []
+        for item in items:
+            if not is_url(item['name']):
+                items.remove(item)
+                continue
+            lst.append(self.make_task(
+                url=item['name'],
+                groups=self.url_groups,
+                classify=self.visit_classify,
+                weight=self.url_weight))
+        self.scheduler.insert_tasks(lst, level=self.url_weight)
+
+        for item in items:
+            MGO_URLS.update_many(
+                {'_id': item['_id']},
+                {'$set': {'enable_added': True}}
+            )
+        logger.info(f'[同步数据]更新{len(items)}条网址数据')
+            # self.loops_interval(self.sync_data_urls.__name__, _interval)
 
     def sync_data_keywords(self):
         """同步关键词数据"""
-        _interval = 1800
-        while True:
-            words = self.seed_keywords()
-            # 处理关键词格式并推送到任务队列
-            words = [str(word).replace(' ', '').strip() for word in words]
-            lst = []
-            for word in words:
-                lst.append(self.make_task(
-                    search=word,
-                    groups=self.keyword_groups,
-                    classify=self.query_classify,
-                    weight=self.keyword_weight))
-            self._scheduler.insert_tasks(lst, level=self.keyword_weight)
-            logger.info(f'同步更新{len(words)}条关键词数据')
-            self.loops_interval(self.sync_data_keywords.__name__, _interval)
+        # _interval = 1800
+        # while True:
+        words = self.seed_keywords()
+        # 处理关键词格式并推送到任务队列
+        words = [str(word).replace(' ', '').strip() for word in words]
+        lst = []
+        for word in words:
+            lst.append(self.make_task(
+                search=word,
+                groups=self.keyword_groups,
+                classify=self.query_classify,
+                weight=self.keyword_weight))
+        self.scheduler.insert_tasks(lst, level=self.keyword_weight)
+        logger.info(f'[同步数据]更新{len(words)}条关键词数据')
+        # self.loops_interval(self.sync_data_keywords.__name__, _interval)
 
     def sync_data_orgs(self):
         """同步组织单位数据"""
+        # _interval = 3600
+        # while True:
+        items = self.seed_orgs()
+        # 处理单位组织名称并推送到任务队列
+        orgs = []
+        for item in items:
+            name = item.get('name')
+            if name in ['', None]:
+                logger.warning(f'[异常的单位组织]{item}')
+                continue
+            word = str(name).replace(' ', '').strip()
+            orgs.append(word)
+        lst = []
+        for word in orgs:
+            lst.append(self.make_task(
+                search=word,
+                groups=self.org_groups,
+                classify=self.query_classify,
+                weight=self.org_weight))
+        self.scheduler.insert_tasks(lst, level=self.org_weight)
+        # 已添加的组织单位名称进行标记,之后不在推送到任务队列
+        for item in items:
+            MGO_ORGS.update_one(
+                {'_id': item['_id']},
+                {'$set': {'enable_added': True}}
+            )
+        logger.info(f'[同步数据]更新{len(items)}个单位组织数据')
+            # self.loops_interval(self.sync_data_orgs.__name__, _interval)
+
+    def sync_lua_commons(self):
+        """同步lua采集爬虫中网址与网址名称"""
         _interval = 3600
-        while True:
-            items = self.seed_orgs()
-            # 处理单位组织名称并推送到任务队列
-            orgs = []
-            for item in items:
-                name = item.get('name')
-                if name in ['', None]:
-                    logger.warning(f'[异常的单位组织]{item}')
-                    continue
-                word = str(name).replace(' ', '').strip()
-                orgs.append(word)
-            lst = []
-            for word in orgs:
-                lst.append(self.make_task(
-                    search=word,
-                    groups=self.org_groups,
-                    classify=self.query_classify,
-                    weight=self.org_weight))
-            self._scheduler.insert_tasks(lst, level=self.org_weight)
-            # 已添加的组织单位名称进行标记,之后不在推送到任务队列
-            for item in items:
-                MGO_SEED_ORGS.update_one(
-                    {'_id': item['_id']},
-                    {'$set': {'enable_added': True}}
-                )
-            logger.info(f'同步更新{len(items)}个单位组织数据')
-            self.loops_interval(self.sync_data_orgs.__name__, _interval)
+        # while True:
+        items = self.lua_common_domains()
+        for item in items:
+            item['create_at'] = int2long(int(time.time()))
+            MGO_REMOVAL_DUPLICATE.insert_one(item)
+        logger.info(f'[同步数据]更新{len(items)}个网站域名数据')
+            # self.loops_interval(self.sync_lua_commons.__name__, _interval)
 
     def sync_data(self):
         """同步数据"""
-        logger.info(f'[数据寻源]开启数据同步')
-        threading.Thread(
-            target=self.sync_data_urls,
-            name='LoadingSeedUrls'
-        ).start()
-        threading.Thread(
-            target=self.sync_data_keywords,
-            name='LoadingSeedKeyWords'
-        ).start()
-        threading.Thread(
-            target=self.sync_data_orgs,
-            name='LoadingSeedOrgs'
-        ).start()
+        # threading.Thread(
+        #     target=self.sync_data_urls,
+        #     name='LoadingSeedUrls'
+        # ).start()
+        # threading.Thread(
+        #     target=self.sync_data_keywords,
+        #     name='LoadingSeedKeyWords'
+        # ).start()
+        # threading.Thread(
+        #     target=self.sync_data_orgs,
+        #     name='LoadingSeedOrgs'
+        # ).start()
+        # threading.Thread(
+        #     target=self.sync_lua_commons,
+        #     name='LoadingLuaCommons'
+        # ).start()
+        logger.info(f'[同步数据]初始化加载')
+        _interval = 600
+        while True:
+            self.sync_lua_commons()
+            self.sync_data_orgs()
+            self.sync_data_urls()
+            self.sync_data_keywords()
+            self.loops_interval(self.sync_data.__name__, _interval)
 
 
 class SearchEngine(BasicSearch):
 
-    def __init__(self, **kwargs):
-        super(SearchEngine, self).__init__(scheduler=kwargs.get('scheduler'))
+    def __init__(self, wait_task_interval=20, **kwargs):
+        super(SearchEngine, self).__init__(**kwargs)
+        self._wait_task_interval = wait_task_interval
         self._engines = []
 
     def set_search_engine(self, engine=None):
@@ -194,19 +267,22 @@ class SearchEngine(BasicSearch):
             self.set_search_engine(engine)
         return self
 
-    def start_search(self, engine):
+    def enable_engine(self, engine):
+        fname = self.enable_engine.__name__
+        ename = engine.__class__.__name__
+        logger.info(f'[搜索引擎]成功启动 - <{ename}>')
         while True:
-            tasks = self._scheduler.get_task()
+            tasks = self.scheduler.get_task()
             if len(tasks) == 0:
-                self.loops_interval(self.start_search.__name__, 5)
+                self.loops_interval(fname, self._wait_task_interval)
 
             task_key, task = tasks
             task['update_at'] = int2long(int(time.time()))
             if task['classify'] == self.visit_classify:
-                self._scheduler.insert_task(task, level=task['weight'])
+                self.scheduler.insert_task(task, level=task['weight'])
             else:
                 word = task['search']
-                logger.info(f"<{engine.__class__.__name__}> {task['groups']} >>> {word}")
+                logger.info(f"<{ename}> {task['groups']} >>> {word}")
                 urls = engine.search(word)
                 lst = []
                 for url in urls:
@@ -216,47 +292,40 @@ class SearchEngine(BasicSearch):
                         classify=self.visit_classify,
                         weight=self.url_weight
                     ))
-                self._scheduler.insert_tasks(lst, level=self.url_weight)
+                self.scheduler.insert_tasks(lst, level=self.url_weight)
 
-    def search_engines(self):
+    def load_engines(self):
         logger.info(f'[搜索引擎]初始化加载')
-        for engine in self._engines:
-            threading.Thread(
-                target=self.start_search,
-                name='SearchEngine',
-                args=(engine,)
-            ).start()
+        max_workers = len(self._engines)  # 根据搜索引擎最大数量设置最大线程池
+        with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='SearchEngine') as executor:
+            futures = []
+            for engine in self._engines:
+                f = executor.submit(self.enable_engine, engine)
+                f.add_done_callback(err_details)
+                futures.append(f)
+            wait(futures)
 
 
 class VisitDomain(BasicSearch):
 
-    def __init__(
-            self,
-            downloader=None,
-            parser=None,
-            validator=None,
-            allow_load_filter=False,
-            **kwargs,
-    ):
-        super(VisitDomain, self).__init__(scheduler=kwargs.get('scheduler'))
-        self._downloader = (downloader or Downloader())
-        self._parser = (parser or parse_urls)
-        self._validator = (validator or Validator())
-        if allow_load_filter:
-            self._validator.load_filter()
+    def __init__(self, **kwargs):
+        super(VisitDomain, self).__init__(**kwargs)
 
     def push_new_domain(self, task: Task):
         # 新源
         insert_one(MGO_DOMAIN, task)
         # 加入过滤器
-        self._validator.add_filter_feature(task['domain'])
+        self.validator.add_url(task['domain'])
         # 加入去重库
-        remove_duplicate = {'url': task['domain'], 'time': task['update_at']}
-        insert_one(MGO_REPETITION, remove_duplicate)
+        item = dict(
+            domain=task['domain'],
+            create_at=task['update_at']
+        )
+        insert_one(MGO_REMOVAL_DUPLICATE, item)
         logger.info(f"[录入新域]{task['domain']} - {task['name']}")
 
     def verify(self, task: Task):
-        valid_words = self._validator.words(task['name'], task)
+        valid_words = self.validator.words(task['name'], task)
         if all([valid_words]):
             self.push_new_domain(task)
         else:
@@ -265,7 +334,8 @@ class VisitDomain(BasicSearch):
 
     def search_domains(self):
         while True:
-            tasks = self._scheduler.get_task()
+            _redis_key = REDIS_KEY + '-' + str(self.url_weight)
+            tasks = self.scheduler.get_task(_redis_key)
             if len(tasks) == 0:
                 logger.info('关闭寻源爬虫')
                 break
@@ -273,15 +343,15 @@ class VisitDomain(BasicSearch):
             task_key, task = tasks
             task['update_at'] = int2long(int(time.time()))
             if task['classify'] != self.visit_classify:
-                self._scheduler.insert_task(task, level=task['weight'])
+                self.scheduler.insert_task(task, level=task['weight'])
             else:
                 domain = extract_domain(task['url'])
-                allow_visit_domain = self._validator.url(domain)
-                if not allow_visit_domain:
+                exists_domain = self.validator.url(domain)
+                if exists_domain:
                     continue
 
                 logger.info(f'request web site -> {task["url"]}')
-                response = self._downloader.get(task['url'])
+                response = self.downloader.get(task['url'])
                 if response.status_code != 200 or response.text in ['', None]:
                     continue
 
@@ -296,7 +366,7 @@ class VisitDomain(BasicSearch):
                 task['name'] = title
                 try:
                     self.verify(task)
-                    urls = self._parser(page_source, base_url)
+                    urls = self.parser(page_source, base_url)
                     new_tasks = []
                     for url in urls:
                         new_tasks.append(self.make_task(
@@ -305,6 +375,6 @@ class VisitDomain(BasicSearch):
                             classify=self.visit_classify,
                             weight=task['weight']
                         ))
-                    self._scheduler.insert_tasks(new_tasks, level=self.url_weight)
+                    self.scheduler.insert_tasks(new_tasks, level=self.url_weight)
                 except HostsRetrieveError:
                     pass