import threading import time from typing import List, Mapping from common.databases import insert_one from common.log import logger from common.tools import delay_by from constants import ( ORGANIZATION, KEYWORD, SEED_URL, COMPETING_GOODS, VISIT_CLASSIFY, QUERY_CLASSIFY ) from crawler.Task import Task from crawler.analysis import Parser from crawler.download import Downloader from crawler.schedule import Scheduler from crawler.validate import Validator from settings import ( MGO_URLS, MGO_ORGS, MGO_KEYWORDS, MGO_COMPETING_GOODS, MGO_REMOVAL_DUPLICATE, MGO_DOMAIN, MGO_QUERY, MGO_RECORDS ) class BasicSearch: def __init__( self, keyword_weight=9, url_weight=8, org_weight=7, scheduler=None, validator=None, downloader=None, collector=None, parser=None, **kwargs ): self.scheduler = (scheduler or Scheduler()) self.validator = (validator or Validator(redis_key='RemovalDuplicate_')) self.collector = (collector or Validator(redis_key='CollectUrl_')) self.downloader = (downloader or Downloader()) self.parser = (parser or Parser()) # mongo查询 self.query = {'enable_added': {'$exists': False}} self.projection = {'name': 1} self.sort = [('_id', -1)] # 权重 self.org_weight = org_weight self.url_weight = url_weight self.keyword_weight = keyword_weight self.retrieve_weight = 0 # 分类 self.visit_classify = VISIT_CLASSIFY self.query_classify = QUERY_CLASSIFY # 归属组 self.org_groups = ORGANIZATION self.keyword_groups = KEYWORD self.url_groups = SEED_URL self.competing_groups = COMPETING_GOODS @staticmethod def loops_interval(interval): t_name = threading.currentThread().getName() next_run_time = delay_by((interval or 300)) logger.debug(f'程序运行结束:<{t_name}>,下次运行时间:<{next_run_time}>') time.sleep(interval) @staticmethod def make_task(**kwargs): """生成Task对象""" return Task(**kwargs) @staticmethod def make_retrieve_item(task: Task): item = { 'name': task['name'], 'url': task['url'], 'domain': task['domain'], 'origin': task['origin'], 'groups': task['groups'], 'create_at': task['create_at'], 'update_at': task['update_at'], } return item @staticmethod def make_duplicate_removal(task: Task): item = { 'domain': task['domain'], 'origin': task['origin'], 'create_at': task['update_at'], } return item def _push_data(self, purpose: str, task: Task, collection): if purpose == 'save': insert_one(collection, self.make_retrieve_item(task)) elif purpose == 'remove': insert_one(collection, self.make_duplicate_removal(task)) else: insert_one(collection, task) def push_remove(self, task: Task): """数据去重的垃圾表""" logger.info(f"[上传去重特征]【{task['name']} - {task['url']}】") if not self.validator.data(task['url']): self._push_data('remove', task, MGO_REMOVAL_DUPLICATE) self.validator.add_data(task['url']) def push_domain(self, task: Task): """挖掘网站的查询结果""" logger.info(f"[数据挖掘 - 推送]【{task['name']} - {task['domain']}】") if not self.collector.data(task['domain']): self._push_data('save', task, MGO_DOMAIN) self.collector.add_data(task['domain']) def push_query(self, task: Task): """搜索组织单位查询结果""" logger.info(f"[查询结果 - 推送]【{task['name']} - {task['url']}】") self._push_data('save', task, MGO_QUERY) def push_records(self, task: Task): """挖掘数据的记录""" if task['name'] > 20: task['name'] = '{:.20s}'.format(task['name']) logger.info(f"[数据记录 - 推送]【{task['name']} - {task['url']}】") self._push_data('records', task, MGO_RECORDS) def orgs_table(self) -> List[Mapping]: """组织|单位""" search_orgs = [] cursor = MGO_ORGS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): search_orgs.append(item) return search_orgs def keywords_table(self): """关键词""" search_keywords = [] cursor = MGO_KEYWORDS.find(projection=self.projection) for item in cursor.sort(self.sort): search_keywords.append(item['name']) return search_keywords def seed_urls_table(self) -> List[Mapping]: """种子urls""" search_urls = [] cursor = MGO_URLS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): search_urls.append(item) return search_urls def competing_goods_table(self): """竞品urls""" competing_goods = [] cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): competing_goods.append(item) return competing_goods