123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- import threading
- import time
- from typing import List, Mapping
- from common.databases import insert_one, int2long
- from common.execptions import ValidatorError, HostsRetrieveError
- from common.log import logger
- from common.tools import delay_by
- from crawler.Task import Task
- from crawler.analysis import parse_urls
- from crawler.download import Downloader
- from crawler.retrieve import Validator
- from crawler.schedule import Scheduler
- from crawler.search_engine import JySearchEngine
- from crawler.utils import (
- extract_base_url,
- extract_page_title,
- extract_domain,
- valid_url
- )
- from settings import (
- SPECIAL_ENCODINGS,
- MGO_REPETITION,
- MGO_DOMAIN,
- MGO_SEED_URLS,
- MGO_SEED_ORGS,
- MGO_SEED_KEYWORDS,
- )
- class BasicSearch:
- def __init__(self, scheduler=None, **kwargs):
- self._scheduler = (scheduler or Scheduler())
- self.query = {'enable_added': {'$exists': False}}
- self.projection = {'name': 1}
- self.sort = [('_id', -1)]
- # 权重
- self.org_weight = kwargs.get('org_weight', 9)
- self.url_weight = kwargs.get('url_weight', 8)
- self.keyword_weight = kwargs.get('keyword_weight', 5)
- # 分类
- self.visit_classify = 'visit'
- self.query_classify = 'query'
- # 归属组
- self.org_groups = 'organization'
- self.keyword_groups = 'keyword'
- self.url_groups = 'seed_url'
- self._init()
- def _init(self):
- self.sync_data()
- @staticmethod
- def loops_interval(label, interval):
- next_run_time = delay_by((interval or 300))
- logger.info(f'执行:<{label}>,下次运行时间:{next_run_time}')
- time.sleep(interval)
- @staticmethod
- def make_task(**kwargs):
- """生成Task对象"""
- return Task(**kwargs)
- def seed_orgs(self) -> List[Mapping]:
- """组织|单位"""
- search_orgs = []
- cursor = MGO_SEED_ORGS.find(self.query, projection=self.projection)
- for item in cursor.sort(self.sort):
- search_orgs.append(item)
- return search_orgs
- def seed_keywords(self):
- """关键词"""
- search_keywords = []
- cursor = MGO_SEED_KEYWORDS.find(projection=self.projection)
- for item in cursor.sort(self.sort):
- search_keywords.append(item['name'])
- return search_keywords
- def seed_urls(self) -> List[Mapping]:
- """种子urls"""
- search_urls = []
- cursor = MGO_SEED_URLS.find(self.query, projection=self.projection)
- for item in cursor.sort(self.sort):
- search_urls.append(item)
- return search_urls
- def sync_data_urls(self):
- """同步网址数据"""
- _interval = 7200
- while True:
- items = self.seed_urls()
- lst = []
- for item in items:
- if not valid_url(item['name']):
- items.remove(item)
- continue
- lst.append(self.make_task(
- url=item['name'],
- groups=self.url_groups,
- classify=self.visit_classify,
- weight=self.url_weight))
- self._scheduler.insert_tasks(lst, level=self.url_weight)
- for item in items:
- MGO_SEED_URLS.update_many(
- {'_id': item['_id']},
- {'$set': {'enable_added': True}}
- )
- logger.info(f'同步更新{len(items)}条网址数据')
- self.loops_interval(self.sync_data_urls.__name__, _interval)
- def sync_data_keywords(self):
- """同步关键词数据"""
- _interval = 1800
- while True:
- words = self.seed_keywords()
- # 处理关键词格式并推送到任务队列
- words = [str(word).replace(' ', '').strip() for word in words]
- lst = []
- for word in words:
- lst.append(self.make_task(
- search=word,
- groups=self.keyword_groups,
- classify=self.query_classify,
- weight=self.keyword_weight))
- self._scheduler.insert_tasks(lst, level=self.keyword_weight)
- logger.info(f'同步更新{len(words)}条关键词数据')
- self.loops_interval(self.sync_data_keywords.__name__, _interval)
- def sync_data_orgs(self):
- """同步组织单位数据"""
- _interval = 3600
- while True:
- items = self.seed_orgs()
- # 处理单位组织名称并推送到任务队列
- orgs = []
- for item in items:
- name = item.get('name')
- if name in ['', None]:
- logger.warning(f'[异常的单位组织]{item}')
- continue
- word = str(name).replace(' ', '').strip()
- orgs.append(word)
- lst = []
- for word in orgs:
- lst.append(self.make_task(
- search=word,
- groups=self.org_groups,
- classify=self.query_classify,
- weight=self.org_weight))
- self._scheduler.insert_tasks(lst, level=self.org_weight)
- # 已添加的组织单位名称进行标记,之后不在推送到任务队列
- for item in items:
- MGO_SEED_ORGS.update_one(
- {'_id': item['_id']},
- {'$set': {'enable_added': True}}
- )
- logger.info(f'同步更新{len(items)}个单位组织数据')
- self.loops_interval(self.sync_data_orgs.__name__, _interval)
- def sync_data(self):
- """同步数据"""
- logger.info(f'[数据寻源]开启数据同步')
- threading.Thread(
- target=self.sync_data_urls,
- name='LoadingSeedUrls'
- ).start()
- threading.Thread(
- target=self.sync_data_keywords,
- name='LoadingSeedKeyWords'
- ).start()
- threading.Thread(
- target=self.sync_data_orgs,
- name='LoadingSeedOrgs'
- ).start()
- class SearchEngine(BasicSearch):
- def __init__(self, **kwargs):
- super(SearchEngine, self).__init__(scheduler=kwargs.get('scheduler'))
- self._engines = []
- def set_search_engine(self, engine=None):
- if isinstance(engine, JySearchEngine):
- self._engines.append(engine)
- logger.info(f'添加搜索引擎<{engine.__class__.__name__}>完成')
- return self
- def set_search_engines(self, engines):
- for engine in engines:
- self.set_search_engine(engine)
- return self
- def start_search(self, engine):
- while True:
- tasks = self._scheduler.get_task()
- if len(tasks) == 0:
- self.loops_interval(self.start_search.__name__, 5)
- task_key, task = tasks
- task['update_at'] = int2long(int(time.time()))
- if task['classify'] == self.visit_classify:
- self._scheduler.insert_task(task, level=task['weight'])
- else:
- word = task['search']
- logger.info(f"<{engine.__class__.__name__}> {task['groups']} >>> {word}")
- urls = engine.search(word)
- lst = []
- for url in urls:
- lst.append(self.make_task(
- url=url,
- groups=task['groups'],
- classify=self.visit_classify,
- weight=self.url_weight
- ))
- self._scheduler.insert_tasks(lst, level=self.url_weight)
- def search_engines(self):
- logger.info(f'[搜索引擎]初始化加载')
- for engine in self._engines:
- threading.Thread(
- target=self.start_search,
- name='SearchEngine',
- args=(engine,)
- ).start()
- class VisitDomain(BasicSearch):
- def __init__(
- self,
- downloader=None,
- parser=None,
- validator=None,
- allow_load_filter=False,
- **kwargs,
- ):
- super(VisitDomain, self).__init__(scheduler=kwargs.get('scheduler'))
- self._downloader = (downloader or Downloader())
- self._parser = (parser or parse_urls)
- self._validator = (validator or Validator())
- if allow_load_filter:
- self._validator.load_filter()
- def push_new_domain(self, task: Task):
- # 新源
- insert_one(MGO_DOMAIN, task)
- # 加入过滤器
- self._validator.add_filter_feature(task['domain'])
- # 加入去重库
- remove_duplicate = {'url': task['domain'], 'time': task['update_at']}
- insert_one(MGO_REPETITION, remove_duplicate)
- logger.info(f"[录入新域]{task['domain']} - {task['name']}")
- def verify(self, task: Task):
- valid_words = self._validator.words(task['name'], task)
- if all([valid_words]):
- self.push_new_domain(task)
- else:
- if any([task['sensitive'], task['duplication']]):
- raise ValidatorError(f"特征检验未通过:{task['name']}")
- def search_domains(self):
- while True:
- tasks = self._scheduler.get_task()
- if len(tasks) == 0:
- logger.info('关闭寻源爬虫')
- break
- task_key, task = tasks
- task['update_at'] = int2long(int(time.time()))
- if task['classify'] != self.visit_classify:
- self._scheduler.insert_task(task, level=task['weight'])
- else:
- domain = extract_domain(task['url'])
- allow_visit_domain = self._validator.url(domain)
- if not allow_visit_domain:
- continue
- logger.info(f'request web site -> {task["url"]}')
- response = self._downloader.get(task['url'])
- if response.status_code != 200 or response.text in ['', None]:
- continue
- response.encoding = response.apparent_encoding
- if response.encoding in SPECIAL_ENCODINGS:
- response.encoding = 'utf-8'
- task['domain'] = domain
- base_url = extract_base_url(task['url'])
- task['base_url'] = base_url
- page_source = response.text
- title = extract_page_title(page_source)
- task['name'] = title
- try:
- self.verify(task)
- urls = self._parser(page_source, base_url)
- new_tasks = []
- for url in urls:
- new_tasks.append(self.make_task(
- url=url,
- groups=task['groups'],
- classify=self.visit_classify,
- weight=task['weight']
- ))
- self._scheduler.insert_tasks(new_tasks, level=self.url_weight)
- except HostsRetrieveError:
- pass
|