import datetime import re import time from concurrent.futures import ThreadPoolExecutor, wait from pymongo.errors import DocumentTooLarge from common.databases import mongo_table, redis_client from common.log import logger from common.tools import sha1 from crawler.analysis import parser_items from crawler.download import Downloader from crawler.utils import ( extract_page_title, extract_host, err_details, extract_text ) hospital = mongo_table('tmp_crawl', 'hospital_info') r = redis_client() r_key = 'hospital_2022' down_loader = Downloader() seed_tasks = [ ('http://www.hnsrmyy.net', '河南省人民医院'), ('https://www.zzsetyy.cn/index.html', '河南省儿童医院'), ('https://www.pumch.cn/index.html', '北京协和医院'), ] def create_task(host: str, title: str, href: str, depth: int, **kwargs): sid = sha1(href) if not r.hexists(r_key, sid): hospital.insert_one({ 'host': host, 'href': href, 'title': title, 'depth': depth, 'is_crawl': False, 'create_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), **kwargs }) r.hset(r_key, sid, '') return True return False def push_tasks(): for url, title in seed_tasks: item = { 'host': extract_host(url), 'href': url, 'title': title, 'site': title, 'depth': 1, } create_task(**item) def get_tasks(**kwargs): _results = [] _projection = kwargs.pop('projection', {}) projection = { 'host': 1, 'site': 1, 'href': 1, 'depth': 1, 'is_crawl': 1, **_projection } cursor = hospital.find({'is_crawl': False}, projection=projection) for item in cursor.sort([('depth', 1)]).limit(100): _results.append(item) return _results def update_data(mongo_id, source, source_text): item = { 'source': source, 'source_text': source_text, 'is_crawl': True, 'update_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') } try: hospital.update_one({'_id': mongo_id}, {'$set': item}) except DocumentTooLarge: item['source'] = '' hospital.update_one({'_id': mongo_id}, {'$set': item}) def crawl_request(url, host): suffix = re.search('([.][a-zA-Z]{3,5})$', url) if suffix is not None and suffix.group().find('.htm') == -1: raise ValueError(f'无法挖掘的url:{url}') response = down_loader.get(url, timeout=10, max_retries=1, disable_debug_log=False) if response.status_code == 200 and response.text not in [None, '']: items = parser_items(response.text, url=host, mode=1) title = extract_page_title(response.text) source_text = "&_&".join(extract_text(response.text).split()) else: title = f'请求异常-{response.status_code}-{response.reason}' source_text = '' items = [] results = { 'host': host, 'href': url, 'source': response.text, 'title': title, 'source_text': source_text, 'items': items, } return results def crawl_spider(task): _id = task['_id'] _total, _success, _err = 0, 0, 0 try: dic_data = crawl_request(task['href'], task['host']) # 创建挖掘任务 for item in dic_data['items']: href = item['href'] title = item['title'] sub_item = { 'host': task['host'], 'title': title, 'href': href, 'depth': task['depth'] + 1, } success = create_task(**sub_item, site=task['site']) if success: _success += 1 else: _err += 1 _total += 1 except ValueError: dic_data = {} # 更新挖掘结果 update_data(**dict( mongo_id=_id, source=dic_data.get('source', None), source_text=dic_data.get('source_text', None))) logger.info(f"[{str(_id)}]采集成功{_total}条,上传成功{_success}条,删除重复{_err}条") def start(): push_tasks() while True: tasks = get_tasks() logger.info(f"加载采集任务{len(tasks)}条") with ThreadPoolExecutor(max_workers=4, thread_name_prefix='hospital') as Executor: futures = [] for task in tasks: future = Executor.submit(crawl_spider, task) future.add_done_callback(err_details) futures.append(future) wait(futures) logger.info(f"完成采集任务{len(tasks)}条,等待加载") time.sleep(10) if __name__ == '__main__': start()