import threading import time from typing import List, Mapping from common.databases import insert_one, update_one_by_domain from common.log import logger from constants import ( ORGANIZATION, KEYWORD, SEED_URL, COMPETING_GOODS, VISIT_CLASSIFY, QUERY_CLASSIFY ) from crawler.Task import Task from crawler.analysis import Parser from crawler.download import Downloader from crawler.schedule import Scheduler from crawler.validate import Validator from settings import ( MGO_URLS, MGO_ORGS, MGO_KEYWORDS, MGO_COMPETING_GOODS, MGO_GARBAGE, MGO_DOMAIN, MGO_QUERY, MGO_RECORDS ) class BasicService: def __init__( self, scheduler=None, validator=None, downloader=None, collector=None, parser=None, **kwargs ): self.scheduler = (scheduler or Scheduler()) self.validator = (validator or Validator(redis_key='RemovalDuplicate_')) self.collector = (collector or Validator(redis_key='CollectUrl_')) self.downloader = (downloader or Downloader()) self.parser = (parser or Parser()) # mongo查询 self.query = {'enable_added': {'$exists': False}} self.projection = {'name': 1} self.sort = [('_id', -1)] # 权重 self.org_weight = (kwargs.pop('org_weight', None) or 7) self.url_weight = (kwargs.pop('url_weight', None) or 8) self.keyword_weight = (kwargs.pop('keyword_weight', None) or 9) # 分类 self.visit_classify = VISIT_CLASSIFY self.query_classify = QUERY_CLASSIFY # 归属组 self.org_groups = ORGANIZATION self.keyword_groups = KEYWORD self.url_groups = SEED_URL self.competing_groups = COMPETING_GOODS @property def thread_name(self): return threading.currentThread().getName() def loops_interval(self, interval, enable_debug_log=False): if enable_debug_log: logger.debug(f'Thread-<{self.thread_name}> is closed.') time.sleep(interval) @staticmethod def make_task(**kwargs): """生成Task对象""" return Task(**kwargs) def _push_data(self, purpose: str, task: Task, collection): if purpose == 'query': item = task.retrieve_task_to_dict insert_one(collection, item) logger.info(f'<{self.thread_name}> - 查询结果 - {item["_id"]}') elif purpose == 'domain': item = task.excavate_task_to_dict insert_one(collection, item) logger.info(f'<{self.thread_name}> - 挖掘结果 - {task["domain"]}') elif purpose == 'remove': item = task.validate_task_to_dict item['source'] = 'system' update_one_by_domain(collection, item) logger.info(f'<{self.thread_name}> - 添加过滤 - {task["url"]}') else: insert_one(collection, task) logger.info(f'<{self.thread_name}> - 记录数据 - {task["_id"]}') def push_remove(self, task: Task): """数据去重表""" if not self.validator.data(task['url']): self._push_data('remove', task, MGO_GARBAGE) self.validator.add_data(task['url']) return True return False def push_domain(self, task: Task): """数据挖掘结果,推送保存""" if not self.collector.data(task['domain']): self._push_data('domain', task, MGO_DOMAIN) self.collector.add_data(task['domain']) return True return False def push_query(self, task: Task): """搜索组织单位查询结果,推送保存""" self._push_data('query', task, MGO_QUERY) def push_records(self, task: Task): """挖掘数据的记录""" if len(task['name']) > 50: task['name'] = '{:.50s}'.format(task['name']) self._push_data('records', task, MGO_RECORDS) def orgs_table(self) -> List[Mapping]: """组织|单位表""" search_orgs = [] cursor = MGO_ORGS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): search_orgs.append(item) return search_orgs def keywords_table(self): """关键词表""" search_keywords = [] cursor = MGO_KEYWORDS.find(projection=self.projection) for item in cursor.sort(self.sort): search_keywords.append(item['name']) return search_keywords def seed_urls_table(self) -> List[Mapping]: """种子列表""" search_urls = [] cursor = MGO_URLS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): search_urls.append(item) return search_urls def competing_goods_table(self): """竞品列表""" competing_goods = [] cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): competing_goods.append(item) return competing_goods