123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- import threading
- import time
- from typing import List, Mapping
- from common.databases import insert_one, update_one_by_domain
- from common.log import logger
- from common.tools import delay_by
- from constants import (
- ORGANIZATION,
- KEYWORD,
- SEED_URL,
- COMPETING_GOODS,
- VISIT_CLASSIFY,
- QUERY_CLASSIFY
- )
- from crawler.Task import Task
- from crawler.analysis import Parser
- from crawler.download import Downloader
- from crawler.schedule import Scheduler
- from crawler.validate import Validator
- from settings import (
- MGO_URLS,
- MGO_ORGS,
- MGO_KEYWORDS,
- MGO_COMPETING_GOODS,
- MGO_DATA_GARBAGE,
- MGO_DOMAIN,
- MGO_QUERY,
- MGO_RECORDS
- )
- class BasicService:
- def __init__(
- self,
- scheduler=None,
- validator=None,
- downloader=None,
- collector=None,
- parser=None,
- **kwargs
- ):
- self.scheduler = (scheduler or Scheduler())
- self.validator = (validator or Validator(redis_key='RemovalDuplicate_'))
- self.collector = (collector or Validator(redis_key='CollectUrl_'))
- self.downloader = (downloader or Downloader())
- self.parser = (parser or Parser())
- # mongo查询
- self.query = {'enable_added': {'$exists': False}}
- self.projection = {'name': 1}
- self.sort = [('_id', -1)]
- # 权重
- self.org_weight = (kwargs.pop('org_weight', None) or 7)
- self.url_weight = (kwargs.pop('url_weight', None) or 8)
- self.keyword_weight = (kwargs.pop('keyword_weight', None) or 9)
- # 分类
- self.visit_classify = VISIT_CLASSIFY
- self.query_classify = QUERY_CLASSIFY
- # 归属组
- self.org_groups = ORGANIZATION
- self.keyword_groups = KEYWORD
- self.url_groups = SEED_URL
- self.competing_groups = COMPETING_GOODS
- @staticmethod
- def loops_interval(interval, enable_debug_log=False):
- t_name = threading.currentThread().getName()
- next_run_time = delay_by((interval or 300))
- if enable_debug_log:
- logger.debug(f'运行结束:<{t_name}>,下次运行时间:{next_run_time}')
- time.sleep(interval)
- @staticmethod
- def make_task(**kwargs):
- """生成Task对象"""
- return Task(**kwargs)
- @staticmethod
- def make_retrieve_item(task: Task):
- item = {
- 'name': task['name'],
- 'url': task['url'],
- 'domain': task['domain'],
- 'origin': task['origin'],
- 'groups': task['groups'],
- 'create_at': task['create_at'],
- 'update_at': task['update_at'],
- }
- return item
- @staticmethod
- def make_domain_item(task: Task):
- item = {
- 'name': task['name'],
- 'url': task['url'],
- 'domain': task['domain'],
- 'depth': task['depth'],
- 'origin': task['origin'],
- 'groups': task['groups'],
- 'create_at': task['create_at'],
- 'update_at': task['update_at'],
- }
- return item
- @staticmethod
- def make_duplicate_removal(task: Task):
- item = {
- 'domain': task['domain'],
- 'origin': task['origin'],
- 'create_at': task['update_at'],
- }
- return item
- def _push_data(self, purpose: str, task: Task, collection):
- t_name = threading.currentThread().getName()
- if purpose == 'query':
- item = self.make_retrieve_item(task)
- insert_one(collection, item)
- logger.info(f'<{t_name}> - 上传查询结果 - {item["_id"]}')
- elif purpose == 'domain':
- item = self.make_domain_item(task)
- insert_one(collection, item)
- logger.info(f'<{t_name}> - 上传挖掘结果 - {item["_id"]}')
- elif purpose == 'remove':
- item = self.make_duplicate_removal(task)
- update_one_by_domain(collection, item)
- logger.info(f'<{t_name}> - 上传去重特征 - {item["domain"]}')
- else:
- insert_one(collection, task)
- logger.info(f'<{t_name}> - 上传记录数据 - {task["_id"]}')
- def push_remove(self, task: Task):
- """数据去重表"""
- if not self.validator.data(task['url']):
- self._push_data('remove', task, MGO_DATA_GARBAGE)
- self.validator.add_data(task['url'])
- return True
- return False
- def push_domain(self, task: Task):
- """数据挖掘结果,推送保存"""
- if task['groups'] == self.url_groups:
- duplicate = str(task['origin']).count(task['domain']) > 0
- if duplicate:
- return False
- if not self.collector.data(task['domain']):
- self._push_data('domain', task, MGO_DOMAIN)
- self.collector.add_data(task['domain'])
- return True
- return False
- def push_query(self, task: Task):
- """搜索组织单位查询结果,推送保存"""
- self._push_data('query', task, MGO_QUERY)
- def push_records(self, task: Task):
- """挖掘数据的记录"""
- if len(task['name']) > 50:
- task['name'] = '{:.50s}'.format(task['name'])
- self._push_data('records', task, MGO_RECORDS)
- def orgs_table(self) -> List[Mapping]:
- """组织|单位表"""
- search_orgs = []
- cursor = MGO_ORGS.find(self.query, projection=self.projection)
- for item in cursor.sort(self.sort):
- search_orgs.append(item)
- return search_orgs
- def keywords_table(self):
- """关键词表"""
- search_keywords = []
- cursor = MGO_KEYWORDS.find(projection=self.projection)
- for item in cursor.sort(self.sort):
- search_keywords.append(item['name'])
- return search_keywords
- def seed_urls_table(self) -> List[Mapping]:
- """种子列表"""
- search_urls = []
- cursor = MGO_URLS.find(self.query, projection=self.projection)
- for item in cursor.sort(self.sort):
- search_urls.append(item)
- return search_urls
- def competing_goods_table(self):
- """竞品列表"""
- competing_goods = []
- cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection)
- for item in cursor.sort(self.sort):
- competing_goods.append(item)
- return competing_goods
|