dongzhaorui vor 3 Jahren
Ursprung
Commit
71b974532a

+ 8 - 4
find_source/build_spider.py

@@ -1,6 +1,5 @@
-
 from crawler import BreadthCrawler
-from crawler.search_engine import BingSearchEngine
+from crawler.search import BingSearchEngine
 
 
 def main():
@@ -8,8 +7,13 @@ def main():
     BreadthCrawler(
         allow_load_filter=True,
         engines=engines_lst,
-        max_search_page=1,
-        visit_workers=1,
+        url_weight=20,
+        org_weight=5,
+        keyword_weight=15,
+        max_search_page=3,
+        excavate_workers=1,
+        loop_search_interval=30,
+        loop_excavate_interval=10
     ).start()
 
 

+ 4 - 5
find_source/crawler/__init__.py

@@ -1,15 +1,14 @@
 import threading
 
-from crawler.spiders import SyncData, SearchEngine, VisitDomain
+from crawler.spiders import SyncData, SearchEngine, DataExcavate
 
 
 class BreadthCrawler:
 
-    def __init__(self, engines=None, visit_workers=1, **kwargs):
+    def __init__(self, engines=None, **kwargs):
         SyncData(**kwargs)
-        self._engines = SearchEngine(**kwargs)
-        self._engines.set_engines(engines)
-        self._excavator = VisitDomain(visit_workers=visit_workers, **kwargs)
+        self._engines = SearchEngine(engines, **kwargs)
+        self._excavator = DataExcavate(**kwargs)
 
     def start(self):
         threading.Thread(

+ 5 - 5
find_source/crawler/schedule.py

@@ -13,16 +13,16 @@ class Scheduler:
         else:
             return self.mrq.pop_task([REDIS_KEY])
 
-    def insert_task(self, task, level=1, **kwargs):
-        """插入一条任务"""
-        self.mrq.push_task(REDIS_KEY, [task], level=level, **kwargs)
-
     def insert_tasks(self, tasks, level=1, **kwargs):
         """批量插入任务"""
         allow_output_log = kwargs.get('allow_output_log', True)
+        if isinstance(tasks, list):
+            _tasks = tasks
+        else:
+            _tasks = [tasks]
         self.mrq.push_task(
             REDIS_KEY,
-            tasks,
+            _tasks,
             level=level,
             allow_output_log=allow_output_log
         )

+ 261 - 150
find_source/crawler/spiders.py

@@ -4,30 +4,31 @@ from concurrent.futures import ThreadPoolExecutor, wait
 from typing import List, Mapping
 
 from common.databases import insert_one, int2long
-from common.execptions import ValidatorError, HostsRetrieveError
+from common.execptions import HostsRetrieveError
 from common.log import logger
 from common.tools import delay_by
 from crawler.Task import Task
-from crawler.analysis import parse_urls
+from crawler.analysis import Parser
 from crawler.download import Downloader
 from crawler.retrieve import Validator
 from crawler.schedule import Scheduler
-from crawler.search_engine import JySearchEngine
+from crawler.search import JySearchEngine
 from crawler.utils import (
     extract_base_url,
     extract_page_title,
     extract_domain,
     is_url,
-    err_details
+    err_details,
 )
 from settings import (
     REDIS_KEY,
-    SPECIAL_ENCODINGS,
     MGO_LUA_SPIDERS,
+    MGO_SEARCH,
     MGO_DOMAIN,
     MGO_URLS,
     MGO_ORGS,
     MGO_KEYWORDS,
+    MGO_COMPETING_GOODS,
     MGO_REMOVAL_DUPLICATE
 )
 
@@ -36,19 +37,19 @@ class BasicSearch:
 
     def __init__(
             self,
+            keyword_weight=9,
+            url_weight=8,
+            org_weight=7,
             scheduler=None,
             validator=None,
             downloader=None,
             parser=None,
-            org_weight=9,
-            url_weight=8,
-            keyword_weight=7,
             **kwargs
     ):
         self.scheduler = (scheduler or Scheduler())
         self.validator = (validator or Validator())
         self.downloader = (downloader or Downloader())
-        self.parser = (parser or parse_urls)
+        self.parser = (parser or Parser())
         # mongo查询
         self.query = {'enable_added': {'$exists': False}}
         self.projection = {'name': 1}
@@ -60,6 +61,7 @@ class BasicSearch:
         self.org_weight = org_weight
         self.url_weight = url_weight
         self.keyword_weight = keyword_weight
+        self.retrieve_weight = 0
         # 归属组
         self.org_groups = 'organization'
         self.keyword_groups = 'keyword'
@@ -67,9 +69,10 @@ class BasicSearch:
         self.competing_groups = 'competing_goods'
 
     @staticmethod
-    def loops_interval(label, interval):
+    def loops_interval(interval):
+        t_name = threading.currentThread().getName()
         next_run_time = delay_by((interval or 300))
-        logger.info(f'执行:<{label}>,下次运行时间:{next_run_time}')
+        logger.info(f'线程运行结束:<{t_name}>,下次运行时间:{next_run_time}')
         time.sleep(interval)
 
     @staticmethod
@@ -101,6 +104,14 @@ class BasicSearch:
             search_urls.append(item)
         return search_urls
 
+    def seed_competing_goods(self):
+        """竞品urls"""
+        competing_goods = []
+        cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection)
+        for item in cursor.sort(self.sort):
+            competing_goods.append(item)
+        return competing_goods
+
     def lua_common_domains(self):
         """从lua采集爬虫配置表获取网站名称与对应域名,同步到去重库"""
         parm_commons = []
@@ -115,12 +126,39 @@ class BasicSearch:
                 domain = extract_domain(url)
             except IndexError:
                 continue
-            exists_domain = self.validator.url(domain)
-            if not exists_domain:
+            if not self.validator.url(domain):
                 parm_commons.append({'name': name, 'domain': domain})
                 self.validator.add_url(domain)
         return parm_commons
 
+    def push_data(self, purpose: str, task: Task, collection):
+        if purpose == 'save':
+            insert_one(collection, self.make_retrieve_item(task))
+        else:
+            insert_one(collection, self.make_duplicate_removal(task))
+
+    @staticmethod
+    def make_retrieve_item(task: Task):
+        item = {
+            'name': task['name'],
+            'url': task['url'],
+            'domain': task['domain'],
+            'origin': task['origin'],
+            'groups': task['groups'],
+            'create_at': task['create_at'],
+            'update_at': task['update_at'],
+        }
+        return item
+
+    @staticmethod
+    def make_duplicate_removal(task: Task):
+        item = {
+            'domain': task['domain'],
+            'url': task['url'],
+            'create_at': task['update_at'],
+        }
+        return item
+
 
 class SyncData(BasicSearch):
 
@@ -138,33 +176,35 @@ class SyncData(BasicSearch):
 
     def sync_data_urls(self):
         """同步网址数据"""
-        # _interval = 7200
-        # while True:
+        logger.info(f'[同步数据]加载种子网址')
         items = self.seed_urls()
         lst = []
         for item in items:
             if not is_url(item['name']):
                 items.remove(item)
                 continue
+            exists_url = self.validator.url(item['name'])
+            if exists_url:
+                items.remove(item)
+                continue
             lst.append(self.make_task(
                 url=item['name'],
+                origin=item['name'],
                 groups=self.url_groups,
                 classify=self.visit_classify,
-                weight=self.url_weight))
+                weight=self.url_weight
+            ))
         self.scheduler.insert_tasks(lst, level=self.url_weight)
-
         for item in items:
             MGO_URLS.update_many(
                 {'_id': item['_id']},
                 {'$set': {'enable_added': True}}
             )
         logger.info(f'[同步数据]更新{len(items)}条网址数据')
-            # self.loops_interval(self.sync_data_urls.__name__, _interval)
 
     def sync_data_keywords(self):
         """同步关键词数据"""
-        # _interval = 1800
-        # while True:
+        logger.info(f'[同步数据]加载关键词')
         words = self.seed_keywords()
         # 处理关键词格式并推送到任务队列
         words = [str(word).replace('&nbsp;', '').strip() for word in words]
@@ -172,17 +212,17 @@ class SyncData(BasicSearch):
         for word in words:
             lst.append(self.make_task(
                 search=word,
+                origin=word,
                 groups=self.keyword_groups,
                 classify=self.query_classify,
-                weight=self.keyword_weight))
+                weight=self.keyword_weight
+            ))
         self.scheduler.insert_tasks(lst, level=self.keyword_weight)
-        logger.info(f'[同步数据]更新{len(words)}条关键词数据')
-        # self.loops_interval(self.sync_data_keywords.__name__, _interval)
+        logger.info(f'[同步数据]更新{len(words)}条关键词')
 
     def sync_data_orgs(self):
         """同步组织单位数据"""
-        # _interval = 3600
-        # while True:
+        logger.info(f'[同步数据]加载单位组织数据')
         items = self.seed_orgs()
         # 处理单位组织名称并推送到任务队列
         orgs = []
@@ -197,9 +237,11 @@ class SyncData(BasicSearch):
         for word in orgs:
             lst.append(self.make_task(
                 search=word,
+                origin=word,
                 groups=self.org_groups,
                 classify=self.query_classify,
-                weight=self.org_weight))
+                weight=self.org_weight
+            ))
         self.scheduler.insert_tasks(lst, level=self.org_weight)
         # 已添加的组织单位名称进行标记,之后不在推送到任务队列
         for item in items:
@@ -207,174 +249,243 @@ class SyncData(BasicSearch):
                 {'_id': item['_id']},
                 {'$set': {'enable_added': True}}
             )
-        logger.info(f'[同步数据]更新{len(items)}个单位组织数据')
-            # self.loops_interval(self.sync_data_orgs.__name__, _interval)
+        logger.info(f'[同步数据]更新{len(items)}个单位组织')
+
+    def sync_data_competing_goods(self):
+        """同步竞品urls"""
+        logger.info(f'[同步数据]加载竞品列表数据')
+        items = self.seed_competing_goods()
+        # 处理竞品urls并推送到任务队列
+        lst = []
+        for item in items:
+            if not is_url(item['name']):
+                items.remove(item)
+                continue
+            exists_url = self.validator.url(item['name'])
+            if exists_url:
+                items.remove(item)
+                continue
+            lst.append(self.make_task(
+                url=item['name'],
+                origin=item['name'],
+                groups=self.competing_groups,
+                classify=self.visit_classify,
+                weight=self.url_weight))
+        self.scheduler.insert_tasks(lst, level=self.url_weight)
+        # 更新已推送竞品urls状态
+        for item in items:
+            MGO_COMPETING_GOODS.update_one(
+                {'_id': item['_id']},
+                {'$set': {'enable_added': True}}
+            )
+        logger.info(f'[同步数据]更新{len(items)}条竞品源网址')
 
     def sync_lua_commons(self):
         """同步lua采集爬虫中网址与网址名称"""
-        _interval = 3600
-        # while True:
+        logger.info(f'[同步数据]加载lua_commons数据')
         items = self.lua_common_domains()
         for item in items:
             item['create_at'] = int2long(int(time.time()))
             MGO_REMOVAL_DUPLICATE.insert_one(item)
         logger.info(f'[同步数据]更新{len(items)}个网站域名数据')
-            # self.loops_interval(self.sync_lua_commons.__name__, _interval)
 
     def sync_data(self):
         """同步数据"""
-        # threading.Thread(
-        #     target=self.sync_data_urls,
-        #     name='LoadingSeedUrls'
-        # ).start()
-        # threading.Thread(
-        #     target=self.sync_data_keywords,
-        #     name='LoadingSeedKeyWords'
-        # ).start()
-        # threading.Thread(
-        #     target=self.sync_data_orgs,
-        #     name='LoadingSeedOrgs'
-        # ).start()
-        # threading.Thread(
-        #     target=self.sync_lua_commons,
-        #     name='LoadingLuaCommons'
-        # ).start()
         logger.info(f'[同步数据]初始化加载')
         _interval = 600
         while True:
-            self.sync_lua_commons()
-            self.sync_data_orgs()
-            self.sync_data_urls()
-            self.sync_data_keywords()
-            self.loops_interval(self.sync_data.__name__, _interval)
+            try:
+                self.sync_lua_commons()
+                self.sync_data_keywords()
+                self.sync_data_orgs()
+                self.sync_data_competing_goods()
+                self.sync_data_urls()
+            except Exception as e:
+                logger.exception(e)
+            self.loops_interval(_interval)
 
 
 class SearchEngine(BasicSearch):
 
-    def __init__(self, wait_task_interval=20, **kwargs):
+    def __init__(
+            self,
+            engines=None,
+            max_search_page=1,
+            loop_search_interval=60,
+            **kwargs
+    ):
         super(SearchEngine, self).__init__(**kwargs)
-        self._wait_task_interval = wait_task_interval
+        self._max_pages = max_search_page
+        self._interval = loop_search_interval
         self._engines = []
+        self.set_engines(engines)
+
+    def _init(self):
+        self.set_engines(self._engines)
 
-    def set_search_engine(self, engine=None):
+    def _set_engine(self, engine):
         if isinstance(engine, JySearchEngine):
             self._engines.append(engine)
-            logger.info(f'添加搜索引擎<{engine.__class__.__name__}>完成')
+            logger.info(f'[搜索引擎]添加<{engine.__class__.__name__}>完成')
         return self
 
-    def set_search_engines(self, engines):
-        for engine in engines:
-            self.set_search_engine(engine)
+    def set_engines(self, engines):
+        if isinstance(engines, list):
+            for engine in engines:
+                self._set_engine(engine)
+        else:
+            self._set_engine(engines)
         return self
 
-    def enable_engine(self, engine):
-        fname = self.enable_engine.__name__
+    def search(self, engine):
         ename = engine.__class__.__name__
-        logger.info(f'[搜索引擎]成功启动 - <{ename}>')
+        threading.currentThread().setName(ename)
+        logger.info(f'[搜索引擎]启动 - <{ename}>')
         while True:
             tasks = self.scheduler.get_task()
             if len(tasks) == 0:
-                self.loops_interval(fname, self._wait_task_interval)
+                self.loops_interval(self._interval)
+                continue
 
             task_key, task = tasks
+            word = task['search']
             task['update_at'] = int2long(int(time.time()))
             if task['classify'] == self.visit_classify:
-                self.scheduler.insert_task(task, level=task['weight'])
+                self.scheduler.insert_tasks(task, level=task['weight'])
+            elif task['groups'] == self.org_groups:
+                '''使用企查查服务检索site'''
+                logger.info(f"<QccSearch> {task['groups']} >>> {word}")
+                try:
+                    url = engine.by_org_get_site(task['search'])
+                    task['url'] = url
+                    task['name'] = word
+                    task['domain'] = extract_domain(task['url'])
+                    '''保存数据'''
+                    self.push_data('save', task, MGO_SEARCH)
+                    if not is_url(url):
+                        continue
+                    if self.validator.url(task['domain']):
+                        continue
+                    '''domain - 添加过滤器'''
+                    self.validator.add_url(task['domain'])
+                    '''推送数据挖掘队列'''
+                    task['classify'] = self.visit_classify
+                    task['weight'] = self.url_weight
+                    self.scheduler.insert_tasks(task, level=self.url_weight)
+                except HostsRetrieveError as e:
+                    logger.exception(e)
             else:
-                word = task['search']
+                '''使用搜索引擎查询关键词'''
                 logger.info(f"<{ename}> {task['groups']} >>> {word}")
-                urls = engine.search(word)
-                lst = []
-                for url in urls:
-                    lst.append(self.make_task(
-                        url=url,
-                        groups=task['groups'],
-                        classify=self.visit_classify,
-                        weight=self.url_weight
-                    ))
-                self.scheduler.insert_tasks(lst, level=self.url_weight)
-
-    def load_engines(self):
-        logger.info(f'[搜索引擎]初始化加载')
-        max_workers = len(self._engines)  # 根据搜索引擎最大数量设置最大线程池
-        with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='SearchEngine') as executor:
-            futures = []
-            for engine in self._engines:
-                f = executor.submit(self.enable_engine, engine)
-                f.add_done_callback(err_details)
-                futures.append(f)
-            wait(futures)
-
-
-class VisitDomain(BasicSearch):
-
-    def __init__(self, **kwargs):
-        super(VisitDomain, self).__init__(**kwargs)
-
-    def push_new_domain(self, task: Task):
-        # 新源
-        insert_one(MGO_DOMAIN, task)
-        # 加入过滤器
+                cur_page = 0
+                while cur_page < self._max_pages:
+                    cur_page += 1
+                    '''检索文本'''
+                    lst = []
+                    urls = engine.search(word, cur_page)
+                    '''生成数据挖掘任务'''
+                    for url in urls:
+                        domain = extract_domain(url)
+                        if self.validator.url(domain):
+                            continue
+                        lst.append(self.make_task(
+                            url=extract_base_url(url),
+                            origin=task['origin'],
+                            groups=task['groups'],
+                            classify=self.visit_classify,
+                            weight=self.url_weight,
+                        ))
+                    '''推送数据挖掘队列'''
+                    self.scheduler.insert_tasks(lst, level=self.url_weight)
+                    logger.info(f'<{ename}> {word}-第{cur_page}页-共{len(lst)}条')
+
+    def start(self):
+        if len(self._engines) > 0:
+            logger.info(f'[搜索引擎]初始化加载')
+            # 根据搜索引擎最大数量设置最大线程池
+            max_workers = len(self._engines)
+            with ThreadPoolExecutor(max_workers, 'SearchEngine') as executor:
+                futures = []
+                for engine in self._engines:
+                    f = executor.submit(self.search, engine)
+                    f.add_done_callback(err_details)
+                    futures.append(f)
+                wait(futures)
+
+
+class DataExcavate(BasicSearch):
+
+    def __init__(self, excavate_workers=1, loop_excavate_interval=60, **kwargs):
+        super(DataExcavate, self).__init__(**kwargs)
+        self._interval = loop_excavate_interval
+        self._workers = excavate_workers
+
+    def save(self, task: Task):
+        """保存数据挖掘符合要求的站点"""
+        self.push_data('save', task, MGO_DOMAIN)
+        '''url添加去重库'''
+        self.push_data('remove', task, MGO_REMOVAL_DUPLICATE)
+        logger.info(f"[上传数据]{task['name']} - {task['domain']}")
+
+    def retrieve_site(self, task: Task):
+        if self.validator.url(task['url']):
+            return
+
+        logger.info(f'request host -> {task["url"]}')
+        response = self.downloader.get(task['url'])
+        if response.status_code != 200 or response.text in ['', None]:
+            return
+
+        task['domain'] = extract_domain(task['url'])
+        page_source = response.text
+        title = extract_page_title(page_source)
+        task['name'] = title
+        base_url = extract_base_url(task['url'])
+        task['base_url'] = base_url
+
+        items = self.parser.site_items(page_source, base_url)
+        lst = []
+        _c = 0  # 页面包含的关键词计数器
+        for item in items:
+            name, url = item['name'], item['host']
+            if self.validator.requirement_word(name):
+                lst.append(self.make_task(
+                    url=url,
+                    name=name,
+                    groups=task['groups'],
+                    classify=self.visit_classify,
+                    weight=task['weight']
+                ))
+                _c += 1
+
+        if _c > 1:
+            self.save(task)
+        self.scheduler.insert_tasks(lst, level=self.url_weight)
+        '''domain - 添加过滤器'''
         self.validator.add_url(task['domain'])
-        # 加入去重库
-        item = dict(
-            domain=task['domain'],
-            create_at=task['update_at']
-        )
-        insert_one(MGO_REMOVAL_DUPLICATE, item)
-        logger.info(f"[录入新域]{task['domain']} - {task['name']}")
-
-    def verify(self, task: Task):
-        valid_words = self.validator.words(task['name'], task)
-        if all([valid_words]):
-            self.push_new_domain(task)
-        else:
-            if any([task['sensitive'], task['duplication']]):
-                raise ValidatorError(f"特征检验未通过:{task['name']}")
+        '''url - 添加过滤器'''
+        self.validator.add_url(task['url'])
 
-    def search_domains(self):
+    def excavate(self):
+        t_name = threading.currentThread().getName()
+        logger.info(f'[数据挖掘]启动 - {t_name}')
         while True:
             _redis_key = REDIS_KEY + '-' + str(self.url_weight)
             tasks = self.scheduler.get_task(_redis_key)
             if len(tasks) == 0:
-                logger.info('关闭寻源爬虫')
-                break
+                self.loops_interval(self._interval)
+                continue
 
             task_key, task = tasks
             task['update_at'] = int2long(int(time.time()))
-            if task['classify'] != self.visit_classify:
-                self.scheduler.insert_task(task, level=task['weight'])
-            else:
-                domain = extract_domain(task['url'])
-                exists_domain = self.validator.url(domain)
-                if exists_domain:
-                    continue
-
-                logger.info(f'request web site -> {task["url"]}')
-                response = self.downloader.get(task['url'])
-                if response.status_code != 200 or response.text in ['', None]:
-                    continue
+            self.retrieve_site(task)
 
-                response.encoding = response.apparent_encoding
-                if response.encoding in SPECIAL_ENCODINGS:
-                    response.encoding = 'utf-8'
-                task['domain'] = domain
-                base_url = extract_base_url(task['url'])
-                task['base_url'] = base_url
-                page_source = response.text
-                title = extract_page_title(page_source)
-                task['name'] = title
-                try:
-                    self.verify(task)
-                    urls = self.parser(page_source, base_url)
-                    new_tasks = []
-                    for url in urls:
-                        new_tasks.append(self.make_task(
-                            url=url,
-                            groups=task['groups'],
-                            classify=self.visit_classify,
-                            weight=task['weight']
-                        ))
-                    self.scheduler.insert_tasks(new_tasks, level=self.url_weight)
-                except HostsRetrieveError:
-                    pass
+    def start(self):
+        logger.info(f'[数据挖掘]初始化加载')
+        with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
+            futures = []
+            for _ in range(1, self._workers + 1):
+                f = executor.submit(self.excavate)
+                f.add_done_callback(err_details)
+                futures.append(f)
+            wait(futures)

+ 8 - 0
find_source/settings.py

@@ -33,6 +33,14 @@ REQUIREMENT_PHRASE = [
 SENSITIVE_WORDS = [
     '通知', '邮箱', '登录', '注册', '亿元', '地址', '招聘', '试驾', '信用卡',
 ]
+'''搜索引擎过滤特征'''
+ENGINE_FEATURE_RETRIEVES = [
+    'microsoft.com',
+    'cn.bing.com',
+    'beian.miit.gov.cn',
+    'beian.gov.cn/portal/registerSystemInfo',
+    'baike.baidu.com'
+]
 '''特殊编码'''
 SPECIAL_ENCODINGS = [
     'Windows-1254'