123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491 |
- import threading
- import time
- from concurrent.futures import ThreadPoolExecutor, wait
- from typing import List, Mapping
- from common.databases import insert_one, int2long
- from common.execptions import HostsRetrieveError
- from common.log import logger
- from common.tools import delay_by
- from crawler.Task import Task
- from crawler.analysis import Parser
- from crawler.download import Downloader
- from crawler.retrieve import Validator
- from crawler.schedule import Scheduler
- from crawler.search import JySearchEngine
- from crawler.utils import (
- extract_base_url,
- extract_page_title,
- extract_domain,
- is_url,
- err_details,
- )
- from settings import (
- REDIS_KEY,
- MGO_LUA_SPIDERS,
- MGO_SEARCH,
- MGO_DOMAIN,
- MGO_URLS,
- MGO_ORGS,
- MGO_KEYWORDS,
- MGO_COMPETING_GOODS,
- MGO_REMOVAL_DUPLICATE
- )
- class BasicSearch:
- def __init__(
- self,
- keyword_weight=9,
- url_weight=8,
- org_weight=7,
- scheduler=None,
- validator=None,
- downloader=None,
- parser=None,
- **kwargs
- ):
- self.scheduler = (scheduler or Scheduler())
- self.validator = (validator or Validator())
- self.downloader = (downloader or Downloader())
- self.parser = (parser or Parser())
- # mongo查询
- self.query = {'enable_added': {'$exists': False}}
- self.projection = {'name': 1}
- self.sort = [('_id', -1)]
- # 分类
- self.visit_classify = 'visit'
- self.query_classify = 'query'
- # 权重
- self.org_weight = org_weight
- self.url_weight = url_weight
- self.keyword_weight = keyword_weight
- self.retrieve_weight = 0
- # 归属组
- self.org_groups = 'organization'
- self.keyword_groups = 'keyword'
- self.url_groups = 'seed_url'
- self.competing_groups = 'competing_goods'
- @staticmethod
- def loops_interval(interval):
- t_name = threading.currentThread().getName()
- next_run_time = delay_by((interval or 300))
- logger.info(f'线程运行结束:<{t_name}>,下次运行时间:{next_run_time}')
- time.sleep(interval)
- @staticmethod
- def make_task(**kwargs):
- """生成Task对象"""
- return Task(**kwargs)
- def seed_orgs(self) -> List[Mapping]:
- """组织|单位"""
- search_orgs = []
- cursor = MGO_ORGS.find(self.query, projection=self.projection)
- for item in cursor.sort(self.sort):
- search_orgs.append(item)
- return search_orgs
- def seed_keywords(self):
- """关键词"""
- search_keywords = []
- cursor = MGO_KEYWORDS.find(projection=self.projection)
- for item in cursor.sort(self.sort):
- search_keywords.append(item['name'])
- return search_keywords
- def seed_urls(self) -> List[Mapping]:
- """种子urls"""
- search_urls = []
- cursor = MGO_URLS.find(self.query, projection=self.projection)
- for item in cursor.sort(self.sort):
- search_urls.append(item)
- return search_urls
- def seed_competing_goods(self):
- """竞品urls"""
- competing_goods = []
- cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection)
- for item in cursor.sort(self.sort):
- competing_goods.append(item)
- return competing_goods
- def lua_common_domains(self):
- """从lua采集爬虫配置表获取网站名称与对应域名,同步到去重库"""
- parm_commons = []
- projection = {'param_common': 1}
- cursor = MGO_LUA_SPIDERS.find(projection=projection)
- for item in cursor.sort(self.sort):
- name = item['param_common'][1]
- try:
- url = item['param_common'][11]
- if not is_url(url):
- continue
- domain = extract_domain(url)
- except IndexError:
- continue
- if not self.validator.url(domain):
- parm_commons.append({'name': name, 'domain': domain})
- self.validator.add_url(domain)
- return parm_commons
- def push_data(self, purpose: str, task: Task, collection):
- if purpose == 'save':
- insert_one(collection, self.make_retrieve_item(task))
- else:
- insert_one(collection, self.make_duplicate_removal(task))
- @staticmethod
- def make_retrieve_item(task: Task):
- item = {
- 'name': task['name'],
- 'url': task['url'],
- 'domain': task['domain'],
- 'origin': task['origin'],
- 'groups': task['groups'],
- 'create_at': task['create_at'],
- 'update_at': task['update_at'],
- }
- return item
- @staticmethod
- def make_duplicate_removal(task: Task):
- item = {
- 'domain': task['domain'],
- 'url': task['url'],
- 'create_at': task['update_at'],
- }
- return item
- class SyncData(BasicSearch):
- def __init__(self, allow_load_filter=False, **kwargs):
- super(SyncData, self).__init__(**kwargs)
- self._init(allow_load_filter)
- def _init(self, allow_load_filter=False):
- threading.Thread(
- target=self.sync_data,
- name='SyncData'
- ).start()
- if allow_load_filter:
- self.validator.load_filter()
- def sync_data_urls(self):
- """同步网址数据"""
- logger.info(f'[同步数据]加载种子网址')
- items = self.seed_urls()
- lst = []
- for item in items:
- if not is_url(item['name']):
- items.remove(item)
- continue
- exists_url = self.validator.url(item['name'])
- if exists_url:
- items.remove(item)
- continue
- lst.append(self.make_task(
- url=item['name'],
- origin=item['name'],
- groups=self.url_groups,
- classify=self.visit_classify,
- weight=self.url_weight
- ))
- self.scheduler.insert_tasks(lst, level=self.url_weight)
- for item in items:
- MGO_URLS.update_many(
- {'_id': item['_id']},
- {'$set': {'enable_added': True}}
- )
- logger.info(f'[同步数据]更新{len(items)}条网址数据')
- def sync_data_keywords(self):
- """同步关键词数据"""
- logger.info(f'[同步数据]加载关键词')
- words = self.seed_keywords()
- # 处理关键词格式并推送到任务队列
- words = [str(word).replace(' ', '').strip() for word in words]
- lst = []
- for word in words:
- lst.append(self.make_task(
- search=word,
- origin=word,
- groups=self.keyword_groups,
- classify=self.query_classify,
- weight=self.keyword_weight
- ))
- self.scheduler.insert_tasks(lst, level=self.keyword_weight)
- logger.info(f'[同步数据]更新{len(words)}条关键词')
- def sync_data_orgs(self):
- """同步组织单位数据"""
- logger.info(f'[同步数据]加载单位组织数据')
- items = self.seed_orgs()
- # 处理单位组织名称并推送到任务队列
- orgs = []
- for item in items:
- name = item.get('name')
- if name in ['', None]:
- logger.warning(f'[异常的单位组织]{item}')
- continue
- word = str(name).replace(' ', '').strip()
- orgs.append(word)
- lst = []
- for word in orgs:
- lst.append(self.make_task(
- search=word,
- origin=word,
- groups=self.org_groups,
- classify=self.query_classify,
- weight=self.org_weight
- ))
- self.scheduler.insert_tasks(lst, level=self.org_weight)
- # 已添加的组织单位名称进行标记,之后不在推送到任务队列
- for item in items:
- MGO_ORGS.update_one(
- {'_id': item['_id']},
- {'$set': {'enable_added': True}}
- )
- logger.info(f'[同步数据]更新{len(items)}个单位组织')
- def sync_data_competing_goods(self):
- """同步竞品urls"""
- logger.info(f'[同步数据]加载竞品列表数据')
- items = self.seed_competing_goods()
- # 处理竞品urls并推送到任务队列
- lst = []
- for item in items:
- if not is_url(item['name']):
- items.remove(item)
- continue
- exists_url = self.validator.url(item['name'])
- if exists_url:
- items.remove(item)
- continue
- lst.append(self.make_task(
- url=item['name'],
- origin=item['name'],
- groups=self.competing_groups,
- classify=self.visit_classify,
- weight=self.url_weight))
- self.scheduler.insert_tasks(lst, level=self.url_weight)
- # 更新已推送竞品urls状态
- for item in items:
- MGO_COMPETING_GOODS.update_one(
- {'_id': item['_id']},
- {'$set': {'enable_added': True}}
- )
- logger.info(f'[同步数据]更新{len(items)}条竞品源网址')
- def sync_lua_commons(self):
- """同步lua采集爬虫中网址与网址名称"""
- logger.info(f'[同步数据]加载lua_commons数据')
- items = self.lua_common_domains()
- for item in items:
- item['create_at'] = int2long(int(time.time()))
- MGO_REMOVAL_DUPLICATE.insert_one(item)
- logger.info(f'[同步数据]更新{len(items)}个网站域名数据')
- def sync_data(self):
- """同步数据"""
- logger.info(f'[同步数据]初始化加载')
- _interval = 600
- while True:
- try:
- self.sync_lua_commons()
- self.sync_data_keywords()
- self.sync_data_orgs()
- self.sync_data_competing_goods()
- self.sync_data_urls()
- except Exception as e:
- logger.exception(e)
- self.loops_interval(_interval)
- class SearchEngine(BasicSearch):
- def __init__(
- self,
- engines=None,
- max_search_page=1,
- loop_search_interval=60,
- **kwargs
- ):
- super(SearchEngine, self).__init__(**kwargs)
- self._max_pages = max_search_page
- self._interval = loop_search_interval
- self._engines = []
- self.set_engines(engines)
- def _init(self):
- self.set_engines(self._engines)
- def _set_engine(self, engine):
- if isinstance(engine, JySearchEngine):
- self._engines.append(engine)
- logger.info(f'[搜索引擎]添加<{engine.__class__.__name__}>完成')
- return self
- def set_engines(self, engines):
- if isinstance(engines, list):
- for engine in engines:
- self._set_engine(engine)
- else:
- self._set_engine(engines)
- return self
- def search(self, engine):
- ename = engine.__class__.__name__
- threading.currentThread().setName(ename)
- logger.info(f'[搜索引擎]启动 - <{ename}>')
- while True:
- tasks = self.scheduler.get_task()
- if len(tasks) == 0:
- self.loops_interval(self._interval)
- continue
- task_key, task = tasks
- word = task['search']
- task['update_at'] = int2long(int(time.time()))
- if task['classify'] == self.visit_classify:
- self.scheduler.insert_tasks(task, level=task['weight'])
- elif task['groups'] == self.org_groups:
- '''使用企查查服务检索site'''
- logger.info(f"<QccSearch> {task['groups']} >>> {word}")
- try:
- url = engine.by_org_get_site(task['search'])
- task['url'] = url
- task['name'] = word
- task['domain'] = extract_domain(task['url'])
- '''保存数据'''
- self.push_data('save', task, MGO_SEARCH)
- if not is_url(url):
- continue
- if self.validator.url(task['domain']):
- continue
- '''domain - 添加过滤器'''
- self.validator.add_url(task['domain'])
- '''推送数据挖掘队列'''
- task['classify'] = self.visit_classify
- task['weight'] = self.url_weight
- self.scheduler.insert_tasks(task, level=self.url_weight)
- except HostsRetrieveError as e:
- logger.exception(e)
- else:
- '''使用搜索引擎查询关键词'''
- logger.info(f"<{ename}> {task['groups']} >>> {word}")
- cur_page = 0
- while cur_page < self._max_pages:
- cur_page += 1
- '''检索文本'''
- lst = []
- urls = engine.search(word, cur_page)
- '''生成数据挖掘任务'''
- for url in urls:
- domain = extract_domain(url)
- if self.validator.url(domain):
- continue
- lst.append(self.make_task(
- url=extract_base_url(url),
- origin=task['origin'],
- groups=task['groups'],
- classify=self.visit_classify,
- weight=self.url_weight,
- ))
- '''推送数据挖掘队列'''
- self.scheduler.insert_tasks(lst, level=self.url_weight)
- logger.info(f'<{ename}> {word}-第{cur_page}页-共{len(lst)}条')
- def start(self):
- if len(self._engines) > 0:
- logger.info(f'[搜索引擎]初始化加载')
- # 根据搜索引擎最大数量设置最大线程池
- max_workers = len(self._engines)
- with ThreadPoolExecutor(max_workers, 'SearchEngine') as executor:
- futures = []
- for engine in self._engines:
- f = executor.submit(self.search, engine)
- f.add_done_callback(err_details)
- futures.append(f)
- wait(futures)
- class DataExcavate(BasicSearch):
- def __init__(self, excavate_workers=1, loop_excavate_interval=60, **kwargs):
- super(DataExcavate, self).__init__(**kwargs)
- self._interval = loop_excavate_interval
- self._workers = excavate_workers
- def save(self, task: Task):
- """保存数据挖掘符合要求的站点"""
- self.push_data('save', task, MGO_DOMAIN)
- '''url添加去重库'''
- self.push_data('remove', task, MGO_REMOVAL_DUPLICATE)
- logger.info(f"[上传数据]{task['name']} - {task['domain']}")
- def retrieve_site(self, task: Task):
- if self.validator.url(task['url']):
- return
- logger.info(f'request host -> {task["url"]}')
- response = self.downloader.get(task['url'])
- if response.status_code != 200 or response.text in ['', None]:
- return
- task['domain'] = extract_domain(task['url'])
- page_source = response.text
- title = extract_page_title(page_source)
- task['name'] = title
- base_url = extract_base_url(task['url'])
- task['base_url'] = base_url
- items = self.parser.site_items(page_source, base_url)
- lst = []
- _c = 0 # 页面包含的关键词计数器
- for item in items:
- name, url = item['name'], item['host']
- if self.validator.requirement_word(name):
- lst.append(self.make_task(
- url=url,
- name=name,
- groups=task['groups'],
- classify=self.visit_classify,
- weight=task['weight']
- ))
- _c += 1
- if _c > 1:
- self.save(task)
- self.scheduler.insert_tasks(lst, level=self.url_weight)
- '''domain - 添加过滤器'''
- self.validator.add_url(task['domain'])
- '''url - 添加过滤器'''
- self.validator.add_url(task['url'])
- def excavate(self):
- t_name = threading.currentThread().getName()
- logger.info(f'[数据挖掘]启动 - {t_name}')
- while True:
- _redis_key = REDIS_KEY + '-' + str(self.url_weight)
- tasks = self.scheduler.get_task(_redis_key)
- if len(tasks) == 0:
- self.loops_interval(self._interval)
- continue
- task_key, task = tasks
- task['update_at'] = int2long(int(time.time()))
- self.retrieve_site(task)
- def start(self):
- logger.info(f'[数据挖掘]初始化加载')
- with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
- futures = []
- for _ in range(1, self._workers + 1):
- f = executor.submit(self.excavate)
- f.add_done_callback(err_details)
- futures.append(f)
- wait(futures)
|