scheduler.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2017-01-09 10:38
  4. ---------
  5. @summary: 组装parser、 parser_control 和 collector
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. import threading
  11. import time
  12. from collections import Iterable
  13. import feapder.setting as setting
  14. import feapder.utils.tools as tools
  15. from feapder.buffer.item_buffer import ItemBuffer
  16. from feapder.buffer.request_buffer import RequestBuffer
  17. from feapder.buffer.heartbeat_buffer import HeartBeatBuffer
  18. from feapder.core.base_parser import BaseParser
  19. from feapder.core.collector import Collector
  20. from feapder.core.handle_failed_items import HandleFailedItems
  21. from feapder.core.handle_failed_requests import HandleFailedRequests
  22. from feapder.core.parser_control import PaserControl
  23. from feapder.db.rabbitMq import RabbitMQ
  24. from feapder.network.item import Item
  25. from feapder.network.request import Request
  26. from feapder.utils import metrics
  27. from feapder.utils.log import log
  28. class Scheduler(threading.Thread):
  29. __custom_setting__ = {}
  30. def __init__(
  31. self,
  32. redis_key=None,
  33. user=None,
  34. thread_count=None,
  35. begin_callback=None,
  36. end_callback=None,
  37. keep_alive=None,
  38. auto_start_requests=None,
  39. **kwargs
  40. ):
  41. """
  42. @summary: 调度器
  43. ---------
  44. @param redis_key: 爬虫request及item存放redis中的文件夹
  45. @param user: 指定mq特定的程序消费用户标识
  46. @param thread_count: 线程数,默认为配置文件中的线程数
  47. @param begin_callback: 爬虫开始回调函数
  48. @param end_callback: 爬虫结束回调函数
  49. @param keep_alive: 爬虫是否常驻,默认否
  50. @param auto_start_requests: 爬虫是否自动添加任务
  51. ---------
  52. @result:
  53. """
  54. super(Scheduler, self).__init__()
  55. for key, value in self.__class__.__custom_setting__.items():
  56. if key == "AUTO_STOP_WHEN_SPIDER_DONE": # 兼容老版本的配置
  57. setattr(setting, "KEEP_ALIVE", not value)
  58. else:
  59. setattr(setting, key, value)
  60. self._redis_key = redis_key or setting.REDIS_KEY
  61. if not self._redis_key:
  62. raise Exception(
  63. """
  64. redis_key 为redis中存放request与item的目录。不能为空,
  65. 可在setting中配置,如 REDIS_KEY = 'test'
  66. 或spider初始化时传参, 如 TestSpider(redis_key='test')
  67. """
  68. )
  69. self._rabbitmq = RabbitMQ()
  70. self._request_buffer = RequestBuffer(redis_key, user=user)
  71. self._item_buffer = ItemBuffer(redis_key, user=user)
  72. self._collector = Collector(redis_key, user=user)
  73. self._heartbeat_buffer = HeartBeatBuffer(redis_key)
  74. self._parsers = []
  75. self._parser_controls = []
  76. self._parser_control_obj = PaserControl
  77. # 兼容老版本的参数
  78. if "auto_stop_when_spider_done" in kwargs:
  79. self._keep_alive = not kwargs.get("auto_stop_when_spider_done")
  80. else:
  81. self._keep_alive = (
  82. keep_alive if keep_alive is not None else setting.KEEP_ALIVE
  83. )
  84. self._auto_start_requests = (
  85. auto_start_requests
  86. if auto_start_requests is not None
  87. else setting.SPIDER_AUTO_START_REQUESTS
  88. )
  89. self._begin_callback = (
  90. begin_callback
  91. if begin_callback
  92. else lambda: log.info("\n********** feapder begin **********")
  93. )
  94. self._end_callback = (
  95. end_callback
  96. if end_callback
  97. else lambda: log.info("\n********** feapder end **********")
  98. )
  99. self._thread_count = (
  100. setting.SPIDER_THREAD_COUNT if not thread_count else thread_count
  101. )
  102. self._spider_id = tools.get_uuid(redis_key, tools.get_current_date())
  103. self._spider_name = redis_key
  104. self._is_notify_end = False # 是否已经通知结束
  105. # Request 缓存设置
  106. Request.cached_redis_key = redis_key
  107. Request.cached_expire_time = setting.RESPONSE_CACHED_EXPIRE_TIME
  108. self._last_check_task_status_time = 0
  109. self._user = user
  110. self.init_metrics()
  111. def init_metrics(self):
  112. """
  113. 初始化打点系统
  114. """
  115. metrics.init(**setting.METRICS_OTHER_ARGS)
  116. def add_parser(self, parser):
  117. parser = parser() # parser 实例化
  118. if isinstance(parser, BaseParser):
  119. self._parsers.append(parser)
  120. else:
  121. raise ValueError("类型错误,爬虫需继承feapder.BaseParser")
  122. def _start(self):
  123. self.spider_begin() # 启动爬虫 -- start_callback
  124. # 将失败的item入库
  125. if setting.RETRY_FAILED_ITEMS:
  126. handle_failed_items = HandleFailedItems(
  127. redis_key=self._redis_key,
  128. item_buffer=self._item_buffer,
  129. rabbitmq=self._rabbitmq,
  130. user=self._user
  131. )
  132. handle_failed_items.reput_failed_items_to_db()
  133. self._heartbeat_buffer.start() # 心跳管理器
  134. # STEP 3.1 开启 request_buffer -- 任务管理器 负责缓冲添加到数据库中的request
  135. self._request_buffer.start()
  136. # STEP 3.2 开启 item_buffer -- 管道管理器 负责缓冲采集的数据添加到数据库
  137. self._item_buffer.start()
  138. # STEP 3.4 开启 collector -- 任务管理 分发任务
  139. self._collector.start()
  140. # 启动parser control 线程池
  141. for i in range(self._thread_count):
  142. # STEP 3.4 创建执行任务线程池
  143. parser_control = self._parser_control_obj(
  144. self._collector,
  145. self._redis_key,
  146. self._request_buffer,
  147. self._item_buffer,
  148. self._heartbeat_buffer
  149. )
  150. for parser in self._parsers: # step 3.5 把所有待执行任务添加到线程池
  151. parser_control.add_parser(parser)
  152. parser_control.start() # STEP 3.6 开启采集线程
  153. self._parser_controls.append(parser_control)
  154. # STEP 3.7下发任务 有消费线程之后开始读取任务
  155. if setting.RETRY_FAILED_REQUESTS:
  156. # 重设失败的任务
  157. handle_failed_requests = HandleFailedRequests(
  158. redis_key=self._redis_key,
  159. rabbitmq=self._rabbitmq,
  160. user=self._user
  161. )
  162. handle_failed_requests.reput_failed_requests_to_requests()
  163. # STEP 3.8下发新任务 ,生产新任务
  164. if self._auto_start_requests:
  165. self.__add_task()
  166. def run(self):
  167. self._start()
  168. while True:
  169. try:
  170. if self.all_thread_is_done():
  171. if not self._is_notify_end:
  172. self.spider_end() # 爬虫结束
  173. self._is_notify_end = True
  174. if not self._keep_alive: # 如果不是常驻爬虫 关闭所有线程
  175. self._stop_all_thread()
  176. break
  177. else:
  178. self._is_notify_end = False
  179. self.check_task_status()
  180. except (Exception, BaseException) as e:
  181. log.exception(e)
  182. tools.delay_time(1)
  183. def __add_task(self):
  184. # 判断任务池中属否还有任务,若有接着抓取,若无则生产新任务
  185. todo_task_count = self._collector.get_requests_count()
  186. if todo_task_count:
  187. log.info("检查到有待做任务 %s 条,不重下发新任务,将接着上回异常终止处继续抓取" % todo_task_count)
  188. else:
  189. for parser in self._parsers:
  190. results = parser.start_requests()
  191. # 添加request到请求队列,由请求队列统一入库
  192. if results and not isinstance(results, Iterable):
  193. raise Exception("%s.%s返回值必须可迭代" % (parser.name, "start_requests"))
  194. result_type = 1
  195. for result in results or []: # step 对yield 的数据进行判断处理
  196. if isinstance(result, Request): # Request 加入到任务队列
  197. result.parser_name = result.parser_name or parser.name
  198. self._request_buffer.put_request(result)
  199. result_type = 1
  200. elif isinstance(result, Item): # Item 数据,存入到数据管道队列,等待存储
  201. self._item_buffer.put_item(result)
  202. result_type = 2
  203. elif callable(result): # callable 的 request 可能是更新数据库操作的函数
  204. if result_type == 1:
  205. self._request_buffer.put_request(result)
  206. else:
  207. self._item_buffer.put_item(result)
  208. else:
  209. raise TypeError(
  210. "start_requests yield result type error, expect Request、Item、callback func, bug get type: {}".format(
  211. type(result)
  212. )
  213. )
  214. self._request_buffer.flush()
  215. self._item_buffer.flush()
  216. def all_thread_is_done(self):
  217. # Stress 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性
  218. for i in range(5):
  219. # STEP 5.1 检测 collector 状态
  220. if (
  221. self._collector.is_collector_task()
  222. or self._collector.get_requests_count() > 0
  223. ):
  224. return False
  225. # STEP 5.2 检测 parser_control 状态
  226. for parser_control in self._parser_controls:
  227. if not parser_control.is_not_task():
  228. return False
  229. # STEP 5.3 检测 item_buffer 状态
  230. if (
  231. self._item_buffer.get_items_count() > 0
  232. or self._item_buffer.is_adding_to_db()
  233. ):
  234. return False
  235. # STEP 5.4 检测 request_buffer 状态
  236. if (
  237. self._request_buffer.get_requests_count() > 0
  238. or self._request_buffer.is_adding_to_db()
  239. ):
  240. return False
  241. # 检测 heartbeat_buffer 状态
  242. if (
  243. self._heartbeat_buffer.get_items_count() > 0
  244. or self._heartbeat_buffer.is_adding_to_db()
  245. ):
  246. return False
  247. tools.delay_time(1) # 休眠1秒
  248. return True
  249. @tools.run_safe_model("check_task_status")
  250. def check_task_status(self):
  251. """
  252. 检查任务状态 预警
  253. """
  254. # step 每分钟检查一次
  255. now_time = time.time()
  256. if now_time - self._last_check_task_status_time > 60:
  257. self._last_check_task_status_time = now_time
  258. else:
  259. return
  260. # 检查失败任务数量 超过1000 报警
  261. failed_count = self._request_buffer.get_failed_requests_count()
  262. if failed_count > setting.WARNING_FAILED_COUNT:
  263. # 发送报警
  264. msg = "《%s》爬虫当前失败任务 %s, 请检查爬虫是否正常" % (self._spider_name, failed_count)
  265. log.error(msg)
  266. self.send_msg(
  267. msg,
  268. level="error",
  269. message_prefix="《%s》爬虫当前失败任务数报警" % (self._spider_name),
  270. )
  271. # parser_control 实时统计已做任务数及失败任务数,若成功率<0.5 则报警
  272. failed_task_count, success_task_count = PaserControl.get_task_status_count()
  273. total_count = success_task_count + failed_task_count
  274. if total_count > 0:
  275. task_success_rate = success_task_count / total_count
  276. if task_success_rate < 0.5:
  277. # 发送报警
  278. msg = "《%s》爬虫当前任务成功数%s, 失败数%s, 成功率 %.2f, 请检查爬虫是否正常" % (
  279. self._spider_name,
  280. success_task_count,
  281. failed_task_count,
  282. task_success_rate,
  283. )
  284. log.error(msg)
  285. self.send_msg(
  286. msg,
  287. level="error",
  288. message_prefix="《%s》爬虫当前任务成功率报警" % (self._spider_name),
  289. )
  290. # 检查入库失败次数
  291. if self._item_buffer.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
  292. msg = "《{}》爬虫导出数据失败,失败次数:{}, 请检查爬虫是否正常".format(
  293. self._spider_name, self._item_buffer.export_falied_times
  294. )
  295. log.error(msg)
  296. self.send_msg(
  297. msg,
  298. level="error",
  299. message_prefix="《%s》爬虫导出数据失败" % (self._spider_name)
  300. )
  301. def _stop_all_thread(self):
  302. # 关闭任务管理器
  303. self._request_buffer.stop()
  304. # 关闭数据管道
  305. self._item_buffer.stop()
  306. # 关闭任务管理
  307. self._collector.stop()
  308. # 停止 parser_controls
  309. for parser_control in self._parser_controls:
  310. parser_control.stop()
  311. # 关闭心跳管理
  312. self._heartbeat_buffer.stop()
  313. # 记录爬虫停止时间
  314. self._started.clear()
  315. def send_msg(self, msg, level="debug", message_prefix=""):
  316. # log.debug("发送报警 level:{} msg{}".format(level, msg))
  317. tools.send_msg(msg=msg, level=level, message_prefix=message_prefix)
  318. def spider_begin(self):
  319. """
  320. @summary: start_monitor_task 方式启动,此函数与spider_end不在同一进程内,变量不可共享
  321. ---------
  322. ---------
  323. @result:
  324. """
  325. if self._begin_callback:
  326. self._begin_callback()
  327. for parser in self._parsers:
  328. parser.start_callback() # 任务开始回调
  329. def spider_end(self):
  330. if self._end_callback: # 任务结束回调
  331. self._end_callback()
  332. for parser in self._parsers:
  333. if not self._keep_alive:
  334. parser.close() # 爬虫自定义 close
  335. parser.end_callback() # 调用结束回调函数
  336. if not self._keep_alive:
  337. # 关闭webdirver
  338. Request.render_downloader and Request.render_downloader.close_all()
  339. metrics.close() # 关闭打点
  340. else:
  341. metrics.flush()
  342. if self._keep_alive:
  343. log.info("爬虫不自动结束,等待下一轮任务...")
  344. else:
  345. log.info("《%s》爬虫结束" % (self._spider_name,))
  346. def join(self, timeout=None):
  347. """
  348. 重写线程的join
  349. """
  350. if not self._started.is_set():
  351. return
  352. super().join()