hospital.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import datetime
  2. import re
  3. import time
  4. from concurrent.futures import ThreadPoolExecutor, wait
  5. from pymongo.errors import DocumentTooLarge
  6. from common.databases import mongo_table, redis_client
  7. from common.log import logger
  8. from common.tools import sha1
  9. from crawler.analysis import parser_items
  10. from crawler.download import Downloader
  11. from crawler.utils import (
  12. extract_page_title,
  13. extract_host,
  14. err_details,
  15. extract_text
  16. )
  17. hospital = mongo_table('tmp_crawl', 'hospital_info')
  18. r = redis_client()
  19. r_key = 'hospital_2022'
  20. down_loader = Downloader()
  21. seed_tasks = [
  22. ('http://www.hnsrmyy.net', '河南省人民医院'),
  23. ('https://www.zzsetyy.cn/index.html', '河南省儿童医院'),
  24. ('https://www.pumch.cn/index.html', '北京协和医院'),
  25. ]
  26. def create_task(host: str, title: str, href: str, depth: int, **kwargs):
  27. sid = sha1(href)
  28. if not r.hexists(r_key, sid):
  29. hospital.insert_one({
  30. 'host': host,
  31. 'href': href,
  32. 'title': title,
  33. 'depth': depth,
  34. 'is_crawl': False,
  35. 'create_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  36. **kwargs
  37. })
  38. r.hset(r_key, sid, '')
  39. return True
  40. return False
  41. def push_tasks():
  42. for url, title in seed_tasks:
  43. item = {
  44. 'host': extract_host(url),
  45. 'href': url,
  46. 'title': title,
  47. 'site': title,
  48. 'depth': 1,
  49. }
  50. create_task(**item)
  51. def get_tasks(**kwargs):
  52. _results = []
  53. _projection = kwargs.pop('projection', {})
  54. projection = {
  55. 'host': 1,
  56. 'site': 1,
  57. 'href': 1,
  58. 'depth': 1,
  59. 'is_crawl': 1,
  60. **_projection
  61. }
  62. cursor = hospital.find({'is_crawl': False}, projection=projection)
  63. for item in cursor.sort([('depth', 1)]).limit(100):
  64. _results.append(item)
  65. return _results
  66. def update_data(mongo_id, source, source_text):
  67. item = {
  68. 'source': source,
  69. 'source_text': source_text,
  70. 'is_crawl': True,
  71. 'update_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  72. }
  73. try:
  74. hospital.update_one({'_id': mongo_id}, {'$set': item})
  75. except DocumentTooLarge:
  76. item['source'] = ''
  77. hospital.update_one({'_id': mongo_id}, {'$set': item})
  78. def crawl_request(url, host):
  79. suffix = re.search('([.][a-zA-Z]{3,5})$', url)
  80. if suffix is not None and suffix.group().find('.htm') == -1:
  81. raise ValueError(f'无法挖掘的url:{url}')
  82. response = down_loader.get(url, timeout=10, max_retries=1, disable_debug_log=False)
  83. if response.status_code == 200 and response.text not in [None, '']:
  84. items = parser_items(response.text, url=host, mode=1)
  85. title = extract_page_title(response.text)
  86. source_text = "&_&".join(extract_text(response.text).split())
  87. else:
  88. title = f'请求异常-{response.status_code}-{response.reason}'
  89. source_text = ''
  90. items = []
  91. results = {
  92. 'host': host,
  93. 'href': url,
  94. 'source': response.text,
  95. 'title': title,
  96. 'source_text': source_text,
  97. 'items': items,
  98. }
  99. return results
  100. def crawl_spider(task):
  101. _id = task['_id']
  102. _total, _success, _err = 0, 0, 0
  103. try:
  104. dic_data = crawl_request(task['href'], task['host'])
  105. # 创建挖掘任务
  106. for item in dic_data['items']:
  107. href = item['href']
  108. title = item['title']
  109. sub_item = {
  110. 'host': task['host'],
  111. 'title': title,
  112. 'href': href,
  113. 'depth': task['depth'] + 1,
  114. }
  115. success = create_task(**sub_item, site=task['site'])
  116. if success:
  117. _success += 1
  118. else:
  119. _err += 1
  120. _total += 1
  121. except ValueError:
  122. dic_data = {}
  123. # 更新挖掘结果
  124. update_data(**dict(
  125. mongo_id=_id,
  126. source=dic_data.get('source', None),
  127. source_text=dic_data.get('source_text', None)))
  128. logger.info(f"[{str(_id)}]采集成功{_total}条,上传成功{_success}条,删除重复{_err}条")
  129. def start():
  130. push_tasks()
  131. while True:
  132. tasks = get_tasks()
  133. logger.info(f"加载采集任务{len(tasks)}条")
  134. with ThreadPoolExecutor(max_workers=4, thread_name_prefix='hospital') as Executor:
  135. futures = []
  136. for task in tasks:
  137. future = Executor.submit(crawl_spider, task)
  138. future.add_done_callback(err_details)
  139. futures.append(future)
  140. wait(futures)
  141. logger.info(f"完成采集任务{len(tasks)}条,等待加载")
  142. time.sleep(10)
  143. if __name__ == '__main__':
  144. start()