import threading import time from typing import List, Mapping from common.databases import insert_one, update_one_by_domain from common.log import logger from common.tools import delay_by from constants import ( ORGANIZATION, KEYWORD, SEED_URL, COMPETING_GOODS, VISIT_CLASSIFY, QUERY_CLASSIFY ) from crawler.Task import Task from crawler.analysis import Parser from crawler.download import Downloader from crawler.schedule import Scheduler from crawler.validate import Validator from settings import ( MGO_URLS, MGO_ORGS, MGO_KEYWORDS, MGO_COMPETING_GOODS, MGO_DATA_GARBAGE, MGO_DOMAIN, MGO_QUERY, MGO_RECORDS ) class BasicService: def __init__( self, scheduler=None, validator=None, downloader=None, collector=None, parser=None, **kwargs ): self.scheduler = (scheduler or Scheduler()) self.validator = (validator or Validator(redis_key='RemovalDuplicate_')) self.collector = (collector or Validator(redis_key='CollectUrl_')) self.downloader = (downloader or Downloader()) self.parser = (parser or Parser()) # mongo查询 self.query = {'enable_added': {'$exists': False}} self.projection = {'name': 1} self.sort = [('_id', -1)] # 权重 self.org_weight = (kwargs.pop('org_weight', None) or 7) self.url_weight = (kwargs.pop('url_weight', None) or 8) self.keyword_weight = (kwargs.pop('keyword_weight', None) or 9) # 分类 self.visit_classify = VISIT_CLASSIFY self.query_classify = QUERY_CLASSIFY # 归属组 self.org_groups = ORGANIZATION self.keyword_groups = KEYWORD self.url_groups = SEED_URL self.competing_groups = COMPETING_GOODS @staticmethod def loops_interval(interval, enable_debug_log=False): t_name = threading.currentThread().getName() next_run_time = delay_by((interval or 300)) if enable_debug_log: logger.debug(f'运行结束:<{t_name}>,下次运行时间:{next_run_time}') time.sleep(interval) @staticmethod def make_task(**kwargs): """生成Task对象""" return Task(**kwargs) @staticmethod def make_retrieve_item(task: Task): item = { 'name': task['name'], 'url': task['url'], 'domain': task['domain'], 'origin': task['origin'], 'groups': task['groups'], 'create_at': task['create_at'], 'update_at': task['update_at'], } return item @staticmethod def make_domain_item(task: Task): item = { 'name': task['name'], 'url': task['url'], 'domain': task['domain'], 'depth': task['depth'], 'origin': task['origin'], 'groups': task['groups'], 'create_at': task['create_at'], 'update_at': task['update_at'], } return item @staticmethod def make_duplicate_removal(task: Task): item = { 'domain': task['domain'], 'origin': task['origin'], 'create_at': task['update_at'], } return item def _push_data(self, purpose: str, task: Task, collection): t_name = threading.currentThread().getName() if purpose == 'query': item = self.make_retrieve_item(task) insert_one(collection, item) logger.info(f'<{t_name}> - 上传查询结果 - {item["_id"]}') elif purpose == 'domain': item = self.make_domain_item(task) insert_one(collection, item) logger.info(f'<{t_name}> - 上传挖掘结果 - {item["_id"]}') elif purpose == 'remove': item = self.make_duplicate_removal(task) update_one_by_domain(collection, item) logger.info(f'<{t_name}> - 上传去重特征 - {item["domain"]}') else: insert_one(collection, task) logger.info(f'<{t_name}> - 上传记录数据 - {task["_id"]}') def push_remove(self, task: Task): """数据去重表""" if not self.validator.data(task['url']): self._push_data('remove', task, MGO_DATA_GARBAGE) self.validator.add_data(task['url']) return True return False def push_domain(self, task: Task): """数据挖掘结果,推送保存""" if task['groups'] == self.url_groups: duplicate = str(task['origin']).count(task['domain']) > 0 if duplicate: return False if not self.collector.data(task['domain']): self._push_data('domain', task, MGO_DOMAIN) self.collector.add_data(task['domain']) return True return False def push_query(self, task: Task): """搜索组织单位查询结果,推送保存""" self._push_data('query', task, MGO_QUERY) def push_records(self, task: Task): """挖掘数据的记录""" if len(task['name']) > 50: task['name'] = '{:.50s}'.format(task['name']) self._push_data('records', task, MGO_RECORDS) def orgs_table(self) -> List[Mapping]: """组织|单位表""" search_orgs = [] cursor = MGO_ORGS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): search_orgs.append(item) return search_orgs def keywords_table(self): """关键词表""" search_keywords = [] cursor = MGO_KEYWORDS.find(projection=self.projection) for item in cursor.sort(self.sort): search_keywords.append(item['name']) return search_keywords def seed_urls_table(self) -> List[Mapping]: """种子列表""" search_urls = [] cursor = MGO_URLS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): search_urls.append(item) return search_urls def competing_goods_table(self): """竞品列表""" competing_goods = [] cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): competing_goods.append(item) return competing_goods