123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import datetime
- import re
- import time
- from concurrent.futures import ThreadPoolExecutor, wait
- from pymongo.errors import DocumentTooLarge
- from common.databases import mongo_table, redis_client
- from common.log import logger
- from common.tools import sha1
- from crawler.analysis import parser_items
- from crawler.download import Downloader
- from crawler.utils import (
- extract_page_title,
- extract_host,
- err_details,
- extract_text
- )
- hospital = mongo_table('tmp_crawl', 'hospital_info')
- r = redis_client()
- r_key = 'hospital_2022'
- down_loader = Downloader()
- seed_tasks = [
- ('http://www.hnsrmyy.net', '河南省人民医院'),
- ('https://www.zzsetyy.cn/index.html', '河南省儿童医院'),
- ('https://www.pumch.cn/index.html', '北京协和医院'),
- ]
- def create_task(host: str, title: str, href: str, depth: int, **kwargs):
- sid = sha1(href)
- if not r.hexists(r_key, sid):
- hospital.insert_one({
- 'host': host,
- 'href': href,
- 'title': title,
- 'depth': depth,
- 'is_crawl': False,
- 'create_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- **kwargs
- })
- r.hset(r_key, sid, '')
- return True
- return False
- def push_tasks():
- for url, title in seed_tasks:
- item = {
- 'host': extract_host(url),
- 'href': url,
- 'title': title,
- 'site': title,
- 'depth': 1,
- }
- create_task(**item)
- def get_tasks(**kwargs):
- _results = []
- _projection = kwargs.pop('projection', {})
- projection = {
- 'host': 1,
- 'site': 1,
- 'href': 1,
- 'depth': 1,
- 'is_crawl': 1,
- **_projection
- }
- cursor = hospital.find({'is_crawl': False}, projection=projection)
- for item in cursor.sort([('depth', 1)]).limit(100):
- _results.append(item)
- return _results
- def update_data(mongo_id, source, source_text):
- item = {
- 'source': source,
- 'source_text': source_text,
- 'is_crawl': True,
- 'update_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
- try:
- hospital.update_one({'_id': mongo_id}, {'$set': item})
- except DocumentTooLarge:
- item['source'] = ''
- hospital.update_one({'_id': mongo_id}, {'$set': item})
- def crawl_request(url, host):
- suffix = re.search('([.][a-zA-Z]{3,5})$', url)
- if suffix is not None and suffix.group().find('.htm') == -1:
- raise ValueError(f'无法挖掘的url:{url}')
- response = down_loader.get(url, timeout=10, max_retries=1, disable_debug_log=False)
- if response.status_code == 200 and response.text not in [None, '']:
- items = parser_items(response.text, url=host, mode=1)
- title = extract_page_title(response.text)
- source_text = "&_&".join(extract_text(response.text).split())
- else:
- title = f'请求异常-{response.status_code}-{response.reason}'
- source_text = ''
- items = []
- results = {
- 'host': host,
- 'href': url,
- 'source': response.text,
- 'title': title,
- 'source_text': source_text,
- 'items': items,
- }
- return results
- def crawl_spider(task):
- _id = task['_id']
- _total, _success, _err = 0, 0, 0
- try:
- dic_data = crawl_request(task['href'], task['host'])
- # 创建挖掘任务
- for item in dic_data['items']:
- href = item['href']
- title = item['title']
- sub_item = {
- 'host': task['host'],
- 'title': title,
- 'href': href,
- 'depth': task['depth'] + 1,
- }
- success = create_task(**sub_item, site=task['site'])
- if success:
- _success += 1
- else:
- _err += 1
- _total += 1
- except ValueError:
- dic_data = {}
- # 更新挖掘结果
- update_data(**dict(
- mongo_id=_id,
- source=dic_data.get('source', None),
- source_text=dic_data.get('source_text', None)))
- logger.info(f"[{str(_id)}]采集成功{_total}条,上传成功{_success}条,删除重复{_err}条")
- def start():
- push_tasks()
- while True:
- tasks = get_tasks()
- logger.info(f"加载采集任务{len(tasks)}条")
- with ThreadPoolExecutor(max_workers=4, thread_name_prefix='hospital') as Executor:
- futures = []
- for task in tasks:
- future = Executor.submit(crawl_spider, task)
- future.add_done_callback(err_details)
- futures.append(future)
- wait(futures)
- logger.info(f"完成采集任务{len(tasks)}条,等待加载")
- time.sleep(10)
- if __name__ == '__main__':
- start()
|