import threading import time from concurrent.futures import ThreadPoolExecutor, wait from typing import List, Mapping from common.databases import insert_one, int2long from common.execptions import HostsRetrieveError from common.log import logger from common.tools import delay_by from crawler.Task import Task from crawler.analysis import Parser from crawler.download import Downloader from crawler.retrieve import Validator from crawler.schedule import Scheduler from crawler.search import JySearchEngine from crawler.utils import ( extract_base_url, extract_page_title, extract_domain, is_url, err_details, ) from settings import ( REDIS_KEY, MGO_LUA_SPIDERS, MGO_SEARCH, MGO_DOMAIN, MGO_URLS, MGO_ORGS, MGO_KEYWORDS, MGO_COMPETING_GOODS, MGO_REMOVAL_DUPLICATE ) class BasicSearch: def __init__( self, keyword_weight=9, url_weight=8, org_weight=7, scheduler=None, validator=None, downloader=None, parser=None, **kwargs ): self.scheduler = (scheduler or Scheduler()) self.validator = (validator or Validator()) self.downloader = (downloader or Downloader()) self.parser = (parser or Parser()) # mongo查询 self.query = {'enable_added': {'$exists': False}} self.projection = {'name': 1} self.sort = [('_id', -1)] # 分类 self.visit_classify = 'visit' self.query_classify = 'query' # 权重 self.org_weight = org_weight self.url_weight = url_weight self.keyword_weight = keyword_weight self.retrieve_weight = 0 # 归属组 self.org_groups = 'organization' self.keyword_groups = 'keyword' self.url_groups = 'seed_url' self.competing_groups = 'competing_goods' @staticmethod def loops_interval(interval): t_name = threading.currentThread().getName() next_run_time = delay_by((interval or 300)) logger.info(f'线程运行结束:<{t_name}>,下次运行时间:{next_run_time}') time.sleep(interval) @staticmethod def make_task(**kwargs): """生成Task对象""" return Task(**kwargs) def seed_orgs(self) -> List[Mapping]: """组织|单位""" search_orgs = [] cursor = MGO_ORGS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): search_orgs.append(item) return search_orgs def seed_keywords(self): """关键词""" search_keywords = [] cursor = MGO_KEYWORDS.find(projection=self.projection) for item in cursor.sort(self.sort): search_keywords.append(item['name']) return search_keywords def seed_urls(self) -> List[Mapping]: """种子urls""" search_urls = [] cursor = MGO_URLS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): search_urls.append(item) return search_urls def seed_competing_goods(self): """竞品urls""" competing_goods = [] cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): competing_goods.append(item) return competing_goods def lua_common_domains(self): """从lua采集爬虫配置表获取网站名称与对应域名,同步到去重库""" parm_commons = [] projection = {'param_common': 1} cursor = MGO_LUA_SPIDERS.find(projection=projection) for item in cursor.sort(self.sort): name = item['param_common'][1] try: url = item['param_common'][11] if not is_url(url): continue domain = extract_domain(url) except IndexError: continue if not self.validator.url(domain): parm_commons.append({'name': name, 'domain': domain}) self.validator.add_url(domain) return parm_commons def push_data(self, purpose: str, task: Task, collection): if purpose == 'save': insert_one(collection, self.make_retrieve_item(task)) else: insert_one(collection, self.make_duplicate_removal(task)) @staticmethod def make_retrieve_item(task: Task): item = { 'name': task['name'], 'url': task['url'], 'domain': task['domain'], 'origin': task['origin'], 'groups': task['groups'], 'create_at': task['create_at'], 'update_at': task['update_at'], } return item @staticmethod def make_duplicate_removal(task: Task): item = { 'domain': task['domain'], 'url': task['url'], 'create_at': task['update_at'], } return item class SyncData(BasicSearch): def __init__(self, allow_load_filter=False, **kwargs): super(SyncData, self).__init__(**kwargs) self._init(allow_load_filter) def _init(self, allow_load_filter=False): threading.Thread( target=self.sync_data, name='SyncData' ).start() if allow_load_filter: self.validator.load_filter() def sync_data_urls(self): """同步网址数据""" logger.info(f'[同步数据]加载种子网址') items = self.seed_urls() lst = [] for item in items: if not is_url(item['name']): items.remove(item) continue exists_url = self.validator.url(item['name']) if exists_url: items.remove(item) continue lst.append(self.make_task( url=item['name'], origin=item['name'], groups=self.url_groups, classify=self.visit_classify, weight=self.url_weight )) self.scheduler.insert_tasks(lst, level=self.url_weight) for item in items: MGO_URLS.update_many( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[同步数据]更新{len(items)}条网址数据') def sync_data_keywords(self): """同步关键词数据""" logger.info(f'[同步数据]加载关键词') words = self.seed_keywords() # 处理关键词格式并推送到任务队列 words = [str(word).replace(' ', '').strip() for word in words] lst = [] for word in words: lst.append(self.make_task( search=word, origin=word, groups=self.keyword_groups, classify=self.query_classify, weight=self.keyword_weight )) self.scheduler.insert_tasks(lst, level=self.keyword_weight) logger.info(f'[同步数据]更新{len(words)}条关键词') def sync_data_orgs(self): """同步组织单位数据""" logger.info(f'[同步数据]加载单位组织数据') items = self.seed_orgs() # 处理单位组织名称并推送到任务队列 orgs = [] for item in items: name = item.get('name') if name in ['', None]: logger.warning(f'[异常的单位组织]{item}') continue word = str(name).replace(' ', '').strip() orgs.append(word) lst = [] for word in orgs: lst.append(self.make_task( search=word, origin=word, groups=self.org_groups, classify=self.query_classify, weight=self.org_weight )) self.scheduler.insert_tasks(lst, level=self.org_weight) # 已添加的组织单位名称进行标记,之后不在推送到任务队列 for item in items: MGO_ORGS.update_one( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[同步数据]更新{len(items)}个单位组织') def sync_data_competing_goods(self): """同步竞品urls""" logger.info(f'[同步数据]加载竞品列表数据') items = self.seed_competing_goods() # 处理竞品urls并推送到任务队列 lst = [] for item in items: if not is_url(item['name']): items.remove(item) continue exists_url = self.validator.url(item['name']) if exists_url: items.remove(item) continue lst.append(self.make_task( url=item['name'], origin=item['name'], groups=self.competing_groups, classify=self.visit_classify, weight=self.url_weight)) self.scheduler.insert_tasks(lst, level=self.url_weight) # 更新已推送竞品urls状态 for item in items: MGO_COMPETING_GOODS.update_one( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[同步数据]更新{len(items)}条竞品源网址') def sync_lua_commons(self): """同步lua采集爬虫中网址与网址名称""" logger.info(f'[同步数据]加载lua_commons数据') items = self.lua_common_domains() for item in items: item['create_at'] = int2long(int(time.time())) MGO_REMOVAL_DUPLICATE.insert_one(item) logger.info(f'[同步数据]更新{len(items)}个网站域名数据') def sync_data(self): """同步数据""" logger.info(f'[同步数据]初始化加载') _interval = 600 while True: try: self.sync_lua_commons() self.sync_data_keywords() self.sync_data_orgs() self.sync_data_competing_goods() self.sync_data_urls() except Exception as e: logger.exception(e) self.loops_interval(_interval) class SearchEngine(BasicSearch): def __init__( self, engines=None, max_search_page=1, loop_search_interval=60, **kwargs ): super(SearchEngine, self).__init__(**kwargs) self._max_pages = max_search_page self._interval = loop_search_interval self._engines = [] self.set_engines(engines) def _init(self): self.set_engines(self._engines) def _set_engine(self, engine): if isinstance(engine, JySearchEngine): self._engines.append(engine) logger.info(f'[搜索引擎]添加<{engine.__class__.__name__}>完成') return self def set_engines(self, engines): if isinstance(engines, list): for engine in engines: self._set_engine(engine) else: self._set_engine(engines) return self def search(self, engine): ename = engine.__class__.__name__ threading.currentThread().setName(ename) logger.info(f'[搜索引擎]启动 - <{ename}>') while True: tasks = self.scheduler.get_task() if len(tasks) == 0: self.loops_interval(self._interval) continue task_key, task = tasks word = task['search'] task['update_at'] = int2long(int(time.time())) if task['classify'] == self.visit_classify: self.scheduler.insert_tasks(task, level=task['weight']) elif task['groups'] == self.org_groups: '''使用企查查服务检索site''' logger.info(f" {task['groups']} >>> {word}") try: url = engine.by_org_get_site(task['search']) task['url'] = url task['name'] = word task['domain'] = extract_domain(task['url']) '''保存数据''' self.push_data('save', task, MGO_SEARCH) if not is_url(url): continue if self.validator.url(task['domain']): continue '''domain - 添加过滤器''' self.validator.add_url(task['domain']) '''推送数据挖掘队列''' task['classify'] = self.visit_classify task['weight'] = self.url_weight self.scheduler.insert_tasks(task, level=self.url_weight) except HostsRetrieveError as e: logger.exception(e) else: '''使用搜索引擎查询关键词''' logger.info(f"<{ename}> {task['groups']} >>> {word}") cur_page = 0 while cur_page < self._max_pages: cur_page += 1 '''检索文本''' lst = [] urls = engine.search(word, cur_page) '''生成数据挖掘任务''' for url in urls: domain = extract_domain(url) if self.validator.url(domain): continue lst.append(self.make_task( url=extract_base_url(url), origin=task['origin'], groups=task['groups'], classify=self.visit_classify, weight=self.url_weight, )) '''推送数据挖掘队列''' self.scheduler.insert_tasks(lst, level=self.url_weight) logger.info(f'<{ename}> {word}-第{cur_page}页-共{len(lst)}条') def start(self): if len(self._engines) > 0: logger.info(f'[搜索引擎]初始化加载') # 根据搜索引擎最大数量设置最大线程池 max_workers = len(self._engines) with ThreadPoolExecutor(max_workers, 'SearchEngine') as executor: futures = [] for engine in self._engines: f = executor.submit(self.search, engine) f.add_done_callback(err_details) futures.append(f) wait(futures) class DataExcavate(BasicSearch): def __init__(self, excavate_workers=1, loop_excavate_interval=60, **kwargs): super(DataExcavate, self).__init__(**kwargs) self._interval = loop_excavate_interval self._workers = excavate_workers def save(self, task: Task): """保存数据挖掘符合要求的站点""" self.push_data('save', task, MGO_DOMAIN) '''url添加去重库''' self.push_data('remove', task, MGO_REMOVAL_DUPLICATE) logger.info(f"[上传数据]{task['name']} - {task['domain']}") def retrieve_site(self, task: Task): if self.validator.url(task['url']): return logger.info(f'request host -> {task["url"]}') response = self.downloader.get(task['url']) if response.status_code != 200 or response.text in ['', None]: return task['domain'] = extract_domain(task['url']) page_source = response.text title = extract_page_title(page_source) task['name'] = title base_url = extract_base_url(task['url']) task['base_url'] = base_url items = self.parser.site_items(page_source, base_url) lst = [] _c = 0 # 页面包含的关键词计数器 for item in items: name, url = item['name'], item['host'] if self.validator.requirement_word(name): lst.append(self.make_task( url=url, name=name, groups=task['groups'], classify=self.visit_classify, weight=task['weight'] )) _c += 1 if _c > 1: self.save(task) self.scheduler.insert_tasks(lst, level=self.url_weight) '''domain - 添加过滤器''' self.validator.add_url(task['domain']) '''url - 添加过滤器''' self.validator.add_url(task['url']) def excavate(self): t_name = threading.currentThread().getName() logger.info(f'[数据挖掘]启动 - {t_name}') while True: _redis_key = REDIS_KEY + '-' + str(self.url_weight) tasks = self.scheduler.get_task(_redis_key) if len(tasks) == 0: self.loops_interval(self._interval) continue task_key, task = tasks task['update_at'] = int2long(int(time.time())) self.retrieve_site(task) def start(self): logger.info(f'[数据挖掘]初始化加载') with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor: futures = [] for _ in range(1, self._workers + 1): f = executor.submit(self.excavate) f.add_done_callback(err_details) futures.append(f) wait(futures)