spiders.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. import time
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from typing import List
  4. from common.databases import insert_one
  5. from common.execptions import ValidatorError, HostsRetrieveError
  6. from common.log import logger
  7. from crawler.Task import Task
  8. from crawler.analysis import parse_urls
  9. from crawler.download import Downloader
  10. from crawler.retrieve import Validator
  11. from crawler.schedule import Scheduler
  12. from crawler.search_engine import JySearchEngine
  13. from crawler.utils import (
  14. extract_base_url,
  15. extract_page_title,
  16. err_details,
  17. extract_domain
  18. )
  19. from settings import (
  20. MGO_REPETITION,
  21. MGO_RECORDS,
  22. SPECIAL_ENCODINGS,
  23. MGO_VISIT_KEYWORDS,
  24. MGO_VISIT_ORGANIZATION
  25. )
  26. class BasicScheduler:
  27. def __init__(self, scheduler=None, **kwargs):
  28. self._scheduler = (scheduler or Scheduler())
  29. class SearchEngine(BasicScheduler):
  30. def __init__(self, **kwargs):
  31. super(SearchEngine, self).__init__(scheduler=kwargs.get('scheduler'))
  32. self._engines = []
  33. def set_search_engine(self, engine=None):
  34. if isinstance(engine, JySearchEngine):
  35. self._engines.append(engine)
  36. logger.info(f'[搜索引擎 - 添加成功]{engine.__class__.__name__}')
  37. return self
  38. def set_search_engines(self, engines):
  39. for engine in engines:
  40. self.set_search_engine(engine)
  41. return self
  42. def search_organizations(self, engine, items: List[dict]):
  43. logger.info(f'[搜索组织]共{len(items)}个')
  44. for item in items:
  45. name = item.get('name')
  46. if name in ['', None]:
  47. logger.warning(f'[组织搜索 - 异常]{item}')
  48. continue
  49. word = str(name).replace(' ', '').strip()
  50. logger.info(f"[搜索 - 组织]{engine.__class__.__name__} >>> {word}")
  51. urls = engine.search(word)
  52. lst = [Task(url=url, groups='organization') for url in urls]
  53. self._scheduler.insert_tasks(lst)
  54. MGO_VISIT_ORGANIZATION.update_one(
  55. {'_id': item['_id']},
  56. {'$set': {'enable_added': True}}
  57. )
  58. def search_words(self, engine, words):
  59. logger.info(f'[搜索关键词]共{len(words)}个')
  60. for word in words:
  61. word = str(word).replace(' ', '').strip()
  62. logger.info(f"[搜索 - 关键词]{engine.__class__.__name__} >>> {word}")
  63. urls = engine.search(word)
  64. lst = [Task(url=url, groups='keyword') for url in urls]
  65. self._scheduler.insert_tasks(lst)
  66. def search_engines(self):
  67. with ThreadPoolExecutor(max_workers=2, thread_name_prefix='SearchEngine') as executor:
  68. '''组织单位'''
  69. search_organizations = []
  70. projection = {'name': 1}
  71. _q = {'enable_added': {'$exists': False}}
  72. for item in MGO_VISIT_ORGANIZATION.find(_q, projection=projection):
  73. search_organizations.append(item)
  74. '''关键词'''
  75. search_words = []
  76. for item in MGO_VISIT_KEYWORDS.find(projection=projection):
  77. search_words.append(item['name'])
  78. futures = []
  79. for engine in self._engines:
  80. logger.info(f"[搜索引擎 - {engine.__class__.__name__}]启动成功")
  81. f = executor.submit(self.search_words, engine, search_words)
  82. f.add_done_callback(err_details)
  83. f = executor.submit(self.search_organizations, engine, search_organizations)
  84. f.add_done_callback(err_details)
  85. futures.append(f)
  86. wait(futures)
  87. class SearchDomain(BasicScheduler):
  88. def __init__(
  89. self,
  90. downloader=None,
  91. parser=None,
  92. validator=None,
  93. allow_load_filter=False,
  94. **kwargs,
  95. ):
  96. super(SearchDomain, self).__init__(scheduler=kwargs.get('scheduler'))
  97. self._downloader = (downloader or Downloader())
  98. self._parser = (parser or parse_urls)
  99. self._validator = (validator or Validator())
  100. if allow_load_filter:
  101. self._validator.load_filter()
  102. def verify(self, task):
  103. valid_words = self._validator.words(task['name'], task)
  104. if all([valid_words]):
  105. # 需求站点
  106. insert_one(MGO_RECORDS, task)
  107. # 加入过滤器
  108. self._validator.add_filter_feature(task['domain'])
  109. # 加入去重库
  110. duplicate_site = {'url': task['domain'], 'time': int(time.time())}
  111. insert_one(MGO_REPETITION, duplicate_site)
  112. logger.info(f"[检索成功]{task['domain']} - {task['name']}")
  113. else:
  114. if any([task['sensitive'], task['duplication']]):
  115. raise ValidatorError(f"特征检验未通过:{task['name']}")
  116. def crawl_spider(self):
  117. while True:
  118. tasks = self._scheduler.get_task()
  119. if len(tasks) == 0:
  120. logger.info('关闭寻源爬虫')
  121. break
  122. task_key, task = tasks
  123. groups = task['groups']
  124. domain = extract_domain(task['url'])
  125. allow_visit_domain = self._validator.url(domain)
  126. if not allow_visit_domain:
  127. continue
  128. logger.info(f'request web site -> {task["url"]}')
  129. response = self._downloader.get(task['url'])
  130. print(response, len(response.text))
  131. if response.status_code != 200 or response.text in ['', None]:
  132. continue
  133. response.encoding = response.apparent_encoding
  134. if response.encoding in SPECIAL_ENCODINGS:
  135. response.encoding = 'utf-8'
  136. task['domain'] = domain
  137. base_url = extract_base_url(task['url'])
  138. task['base_url'] = base_url
  139. page_source = response.text
  140. title = extract_page_title(page_source)
  141. print(title)
  142. task['name'] = title
  143. try:
  144. self.verify(task)
  145. urls = self._parser(page_source, base_url)
  146. new_tasks = [Task(url=url, groups=groups) for url in urls]
  147. self._scheduler.insert_tasks(new_tasks)
  148. except HostsRetrieveError:
  149. pass