spiders.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. import threading
  2. import time
  3. from typing import List, Mapping
  4. from common.databases import insert_one, int2long
  5. from common.execptions import ValidatorError, HostsRetrieveError
  6. from common.log import logger
  7. from common.tools import delay_by
  8. from crawler.Task import Task
  9. from crawler.analysis import parse_urls
  10. from crawler.download import Downloader
  11. from crawler.retrieve import Validator
  12. from crawler.schedule import Scheduler
  13. from crawler.search_engine import JySearchEngine
  14. from crawler.utils import (
  15. extract_base_url,
  16. extract_page_title,
  17. extract_domain,
  18. valid_url
  19. )
  20. from settings import (
  21. SPECIAL_ENCODINGS,
  22. MGO_REPETITION,
  23. MGO_DOMAIN,
  24. MGO_SEED_URLS,
  25. MGO_SEED_ORGS,
  26. MGO_SEED_KEYWORDS,
  27. )
  28. class BasicSearch:
  29. def __init__(self, scheduler=None, **kwargs):
  30. self._scheduler = (scheduler or Scheduler())
  31. self.query = {'enable_added': {'$exists': False}}
  32. self.projection = {'name': 1}
  33. self.sort = [('_id', -1)]
  34. # 权重
  35. self.org_weight = kwargs.get('org_weight', 9)
  36. self.url_weight = kwargs.get('url_weight', 8)
  37. self.keyword_weight = kwargs.get('keyword_weight', 5)
  38. # 分类
  39. self.visit_classify = 'visit'
  40. self.query_classify = 'query'
  41. # 归属组
  42. self.org_groups = 'organization'
  43. self.keyword_groups = 'keyword'
  44. self.url_groups = 'seed_url'
  45. self._init()
  46. def _init(self):
  47. self.sync_data()
  48. @staticmethod
  49. def loops_interval(label, interval):
  50. next_run_time = delay_by((interval or 300))
  51. logger.info(f'执行:<{label}>,下次运行时间:{next_run_time}')
  52. time.sleep(interval)
  53. @staticmethod
  54. def make_task(**kwargs):
  55. """生成Task对象"""
  56. return Task(**kwargs)
  57. def seed_orgs(self) -> List[Mapping]:
  58. """组织|单位"""
  59. search_orgs = []
  60. cursor = MGO_SEED_ORGS.find(self.query, projection=self.projection)
  61. for item in cursor.sort(self.sort):
  62. search_orgs.append(item)
  63. return search_orgs
  64. def seed_keywords(self):
  65. """关键词"""
  66. search_keywords = []
  67. cursor = MGO_SEED_KEYWORDS.find(projection=self.projection)
  68. for item in cursor.sort(self.sort):
  69. search_keywords.append(item['name'])
  70. return search_keywords
  71. def seed_urls(self) -> List[Mapping]:
  72. """种子urls"""
  73. search_urls = []
  74. cursor = MGO_SEED_URLS.find(self.query, projection=self.projection)
  75. for item in cursor.sort(self.sort):
  76. search_urls.append(item)
  77. return search_urls
  78. def sync_data_urls(self):
  79. """同步网址数据"""
  80. _interval = 7200
  81. while True:
  82. items = self.seed_urls()
  83. lst = []
  84. for item in items:
  85. if not valid_url(item['name']):
  86. items.remove(item)
  87. continue
  88. lst.append(self.make_task(
  89. url=item['name'],
  90. groups=self.url_groups,
  91. classify=self.visit_classify,
  92. weight=self.url_weight))
  93. self._scheduler.insert_tasks(lst, level=self.url_weight)
  94. for item in items:
  95. MGO_SEED_URLS.update_many(
  96. {'_id': item['_id']},
  97. {'$set': {'enable_added': True}}
  98. )
  99. logger.info(f'同步更新{len(items)}条网址数据')
  100. self.loops_interval(self.sync_data_urls.__name__, _interval)
  101. def sync_data_keywords(self):
  102. """同步关键词数据"""
  103. _interval = 1800
  104. while True:
  105. words = self.seed_keywords()
  106. # 处理关键词格式并推送到任务队列
  107. words = [str(word).replace('&nbsp;', '').strip() for word in words]
  108. lst = []
  109. for word in words:
  110. lst.append(self.make_task(
  111. search=word,
  112. groups=self.keyword_groups,
  113. classify=self.query_classify,
  114. weight=self.keyword_weight))
  115. self._scheduler.insert_tasks(lst, level=self.keyword_weight)
  116. logger.info(f'同步更新{len(words)}条关键词数据')
  117. self.loops_interval(self.sync_data_keywords.__name__, _interval)
  118. def sync_data_orgs(self):
  119. """同步组织单位数据"""
  120. _interval = 3600
  121. while True:
  122. items = self.seed_orgs()
  123. # 处理单位组织名称并推送到任务队列
  124. orgs = []
  125. for item in items:
  126. name = item.get('name')
  127. if name in ['', None]:
  128. logger.warning(f'[异常的单位组织]{item}')
  129. continue
  130. word = str(name).replace('&nbsp;', '').strip()
  131. orgs.append(word)
  132. lst = []
  133. for word in orgs:
  134. lst.append(self.make_task(
  135. search=word,
  136. groups=self.org_groups,
  137. classify=self.query_classify,
  138. weight=self.org_weight))
  139. self._scheduler.insert_tasks(lst, level=self.org_weight)
  140. # 已添加的组织单位名称进行标记,之后不在推送到任务队列
  141. for item in items:
  142. MGO_SEED_ORGS.update_one(
  143. {'_id': item['_id']},
  144. {'$set': {'enable_added': True}}
  145. )
  146. logger.info(f'同步更新{len(items)}个单位组织数据')
  147. self.loops_interval(self.sync_data_orgs.__name__, _interval)
  148. def sync_data(self):
  149. """同步数据"""
  150. logger.info(f'[数据寻源]开启数据同步')
  151. threading.Thread(
  152. target=self.sync_data_urls,
  153. name='LoadingSeedUrls'
  154. ).start()
  155. threading.Thread(
  156. target=self.sync_data_keywords,
  157. name='LoadingSeedKeyWords'
  158. ).start()
  159. threading.Thread(
  160. target=self.sync_data_orgs,
  161. name='LoadingSeedOrgs'
  162. ).start()
  163. class SearchEngine(BasicSearch):
  164. def __init__(self, **kwargs):
  165. super(SearchEngine, self).__init__(scheduler=kwargs.get('scheduler'))
  166. self._engines = []
  167. def set_search_engine(self, engine=None):
  168. if isinstance(engine, JySearchEngine):
  169. self._engines.append(engine)
  170. logger.info(f'添加搜索引擎<{engine.__class__.__name__}>完成')
  171. return self
  172. def set_search_engines(self, engines):
  173. for engine in engines:
  174. self.set_search_engine(engine)
  175. return self
  176. def start_search(self, engine):
  177. while True:
  178. tasks = self._scheduler.get_task()
  179. if len(tasks) == 0:
  180. self.loops_interval(self.start_search.__name__, 5)
  181. task_key, task = tasks
  182. task['update_at'] = int2long(int(time.time()))
  183. if task['classify'] == self.visit_classify:
  184. self._scheduler.insert_task(task, level=task['weight'])
  185. else:
  186. word = task['search']
  187. logger.info(f"<{engine.__class__.__name__}> {task['groups']} >>> {word}")
  188. urls = engine.search(word)
  189. lst = []
  190. for url in urls:
  191. lst.append(self.make_task(
  192. url=url,
  193. groups=task['groups'],
  194. classify=self.visit_classify,
  195. weight=self.url_weight
  196. ))
  197. self._scheduler.insert_tasks(lst, level=self.url_weight)
  198. def search_engines(self):
  199. logger.info(f'[搜索引擎]初始化加载')
  200. for engine in self._engines:
  201. threading.Thread(
  202. target=self.start_search,
  203. name='SearchEngine',
  204. args=(engine,)
  205. ).start()
  206. class VisitDomain(BasicSearch):
  207. def __init__(
  208. self,
  209. downloader=None,
  210. parser=None,
  211. validator=None,
  212. allow_load_filter=False,
  213. **kwargs,
  214. ):
  215. super(VisitDomain, self).__init__(scheduler=kwargs.get('scheduler'))
  216. self._downloader = (downloader or Downloader())
  217. self._parser = (parser or parse_urls)
  218. self._validator = (validator or Validator())
  219. if allow_load_filter:
  220. self._validator.load_filter()
  221. def push_new_domain(self, task: Task):
  222. # 新源
  223. insert_one(MGO_DOMAIN, task)
  224. # 加入过滤器
  225. self._validator.add_filter_feature(task['domain'])
  226. # 加入去重库
  227. remove_duplicate = {'url': task['domain'], 'time': task['update_at']}
  228. insert_one(MGO_REPETITION, remove_duplicate)
  229. logger.info(f"[录入新域]{task['domain']} - {task['name']}")
  230. def verify(self, task: Task):
  231. valid_words = self._validator.words(task['name'], task)
  232. if all([valid_words]):
  233. self.push_new_domain(task)
  234. else:
  235. if any([task['sensitive'], task['duplication']]):
  236. raise ValidatorError(f"特征检验未通过:{task['name']}")
  237. def search_domains(self):
  238. while True:
  239. tasks = self._scheduler.get_task()
  240. if len(tasks) == 0:
  241. logger.info('关闭寻源爬虫')
  242. break
  243. task_key, task = tasks
  244. task['update_at'] = int2long(int(time.time()))
  245. if task['classify'] != self.visit_classify:
  246. self._scheduler.insert_task(task, level=task['weight'])
  247. else:
  248. domain = extract_domain(task['url'])
  249. allow_visit_domain = self._validator.url(domain)
  250. if not allow_visit_domain:
  251. continue
  252. logger.info(f'request web site -> {task["url"]}')
  253. response = self._downloader.get(task['url'])
  254. if response.status_code != 200 or response.text in ['', None]:
  255. continue
  256. response.encoding = response.apparent_encoding
  257. if response.encoding in SPECIAL_ENCODINGS:
  258. response.encoding = 'utf-8'
  259. task['domain'] = domain
  260. base_url = extract_base_url(task['url'])
  261. task['base_url'] = base_url
  262. page_source = response.text
  263. title = extract_page_title(page_source)
  264. task['name'] = title
  265. try:
  266. self.verify(task)
  267. urls = self._parser(page_source, base_url)
  268. new_tasks = []
  269. for url in urls:
  270. new_tasks.append(self.make_task(
  271. url=url,
  272. groups=task['groups'],
  273. classify=self.visit_classify,
  274. weight=task['weight']
  275. ))
  276. self._scheduler.insert_tasks(new_tasks, level=self.url_weight)
  277. except HostsRetrieveError:
  278. pass