scheduler.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2017-01-09 10:38
  4. ---------
  5. @summary: 组装parser、 parser_control 和 collector
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. import json
  11. import sys
  12. import threading
  13. import time
  14. from collections import Iterable
  15. from types import SimpleNamespace
  16. import feapder.setting as setting
  17. import feapder.utils.tools as tools
  18. from feapder.buffer.item_buffer import ItemBuffer
  19. from feapder.buffer.request_buffer import RequestBuffer
  20. from feapder.buffer.task_buffer import TaskBuffer
  21. from feapder.core.base_parser import BaseParser
  22. from feapder.core.collector import Collector
  23. from feapder.core.handle_failed_items import HandleFailedItems
  24. from feapder.core.handle_failed_requests import HandleFailedRequests
  25. from feapder.core.parser_control import PaserControl
  26. from feapder.db.rabbitMq import RabbitMQ
  27. from feapder.network.item import Item
  28. from feapder.network.request import Request
  29. from feapder.utils import metrics
  30. from feapder.utils.log import log
  31. class Scheduler(threading.Thread):
  32. __custom_setting__ = {}
  33. def __init__(
  34. self,
  35. redis_key=None,
  36. thread_count=None,
  37. begin_callback=None,
  38. end_callback=None,
  39. keep_alive=None,
  40. auto_start_requests=None,
  41. **kwargs
  42. ):
  43. """
  44. @summary: 调度器
  45. ---------
  46. @param redis_key: 爬虫request及item存放redis中的文件夹
  47. @param thread_count: 线程数,默认为配置文件中的线程数
  48. @param begin_callback: 爬虫开始回调函数
  49. @param end_callback: 爬虫结束回调函数
  50. @param keep_alive: 爬虫是否常驻,默认否
  51. @param auto_start_requests: 爬虫是否自动添加任务
  52. ---------
  53. @result:
  54. """
  55. super(Scheduler, self).__init__()
  56. for key, value in self.__class__.__custom_setting__.items():
  57. if key == "AUTO_STOP_WHEN_SPIDER_DONE": # 兼容老版本的配置
  58. setattr(setting, "KEEP_ALIVE", not value)
  59. else:
  60. setattr(setting, key, value)
  61. self._redis_key = redis_key or setting.REDIS_KEY
  62. if not self._redis_key:
  63. raise Exception(
  64. """
  65. redis_key 为redis中存放request与item的目录。不能为空,
  66. 可在setting中配置,如 REDIS_KEY = 'test'
  67. 或spider初始化时传参, 如 TestSpider(redis_key='test')
  68. """
  69. )
  70. self._rabbitmq = RabbitMQ()
  71. self._request_buffer = RequestBuffer(redis_key)
  72. self._item_buffer = ItemBuffer(redis_key)
  73. self._collector = Collector(redis_key)
  74. self._task_buffer = TaskBuffer(redis_key)
  75. self._parsers = []
  76. self._parser_controls = []
  77. self._parser_control_obj = PaserControl
  78. # 兼容老版本的参数
  79. if "auto_stop_when_spider_done" in kwargs:
  80. self._keep_alive = not kwargs.get("auto_stop_when_spider_done")
  81. else:
  82. self._keep_alive = (
  83. keep_alive if keep_alive is not None else setting.KEEP_ALIVE
  84. )
  85. self._auto_start_requests = (
  86. auto_start_requests
  87. if auto_start_requests is not None
  88. else setting.SPIDER_AUTO_START_REQUESTS
  89. )
  90. self._begin_callback = (
  91. begin_callback
  92. if begin_callback
  93. else lambda: log.info("\n********** feapder begin **********")
  94. )
  95. self._end_callback = (
  96. end_callback
  97. if end_callback
  98. else lambda: log.info("\n********** feapder end **********")
  99. )
  100. self._thread_count = (
  101. setting.SPIDER_THREAD_COUNT if not thread_count else thread_count
  102. )
  103. self._spider_id = tools.get_uuid(redis_key, tools.get_current_date())
  104. self._spider_name = redis_key
  105. # 声明爬虫心跳队列
  106. self._tab_spider_heartbeat = setting.SPIDER_HEARTBEAT
  107. self._rabbitmq.declare(queue=self._tab_spider_heartbeat)
  108. self._is_notify_end = False # 是否已经通知结束
  109. # Request 缓存设置
  110. Request.cached_redis_key = redis_key
  111. Request.cached_expire_time = setting.RESPONSE_CACHED_EXPIRE_TIME
  112. self._last_check_task_status_time = 0
  113. self.init_metrics()
  114. def init_metrics(self):
  115. """
  116. 初始化打点系统
  117. """
  118. metrics.init(**setting.METRICS_OTHER_ARGS)
  119. def add_parser(self, parser):
  120. parser = parser() # parser 实例化
  121. if isinstance(parser, BaseParser):
  122. self._parsers.append(parser)
  123. else:
  124. raise ValueError("类型错误,爬虫需继承feapder.BaseParser或feapder.BatchParser")
  125. def _start(self):
  126. self.spider_begin() # 启动爬虫 -- start_callback
  127. # 将失败的item入库
  128. if setting.RETRY_FAILED_ITEMS:
  129. handle_failed_items = HandleFailedItems(
  130. redis_key=self._redis_key,
  131. item_buffer=self._item_buffer,
  132. rabbitmq=self._rabbitmq,
  133. )
  134. handle_failed_items.reput_failed_items_to_db()
  135. # 开启 task_buffer -- 任务状态管理器 负责缓冲采集任务状态更新
  136. self._task_buffer.start()
  137. # STEP 3.1 开启 request_buffer -- 任务管理器,负责缓冲添加到数据库中的request
  138. self._request_buffer.start()
  139. # STEP 3.2 开启 item_buffer -- 管道管理器 负责缓冲采集的数据添加到数据库
  140. self._item_buffer.start()
  141. # STEP 3.4 开启 collector -- 任务管理 分发任务
  142. self._collector.start()
  143. # 启动parser control 线程池
  144. for i in range(self._thread_count):
  145. # STEP 3.4 创建执行任务线程池
  146. parser_control = self._parser_control_obj(
  147. self._collector,
  148. self._redis_key,
  149. self._request_buffer,
  150. self._item_buffer,
  151. self._task_buffer
  152. )
  153. for parser in self._parsers: # step 3.5 把所有待执行任务添加到线程池
  154. parser_control.add_parser(parser)
  155. parser_control.start() # STEP 3.6 开启采集线程
  156. self._parser_controls.append(parser_control)
  157. # STEP 3.7下发任务 有消费线程之后开始读取任务
  158. if setting.RETRY_FAILED_REQUESTS:
  159. # 重设失败的任务
  160. handle_failed_requests = HandleFailedRequests(
  161. redis_key=self._redis_key,
  162. rabbitmq=self._rabbitmq
  163. )
  164. handle_failed_requests.reput_failed_requests_to_requests()
  165. # STEP 3.8下发新任务 ,生产新任务
  166. if self._auto_start_requests:
  167. self.__add_task()
  168. def run(self):
  169. self._start()
  170. while True:
  171. try:
  172. if self.all_thread_is_done():
  173. if not self._is_notify_end:
  174. self.spider_end() # 爬虫结束
  175. self._is_notify_end = True
  176. if not self._keep_alive: # 如果不是常驻爬虫 关闭所有线程
  177. self._stop_all_thread()
  178. break
  179. else:
  180. self._is_notify_end = False
  181. self.check_task_status()
  182. except (Exception, BaseException) as e:
  183. log.exception(e)
  184. tools.delay_time(1)
  185. def __add_task(self):
  186. # 判断任务池中属否还有任务,若有接着抓取,若无则生产新任务
  187. todo_task_count = self._collector.get_requests_count()
  188. if todo_task_count:
  189. log.info("检查到有待做任务 %s 条,不重下发新任务,将接着上回异常终止处继续抓取" % todo_task_count)
  190. else:
  191. for parser in self._parsers:
  192. results = parser.start_requests()
  193. # 添加request到请求队列,由请求队列统一入库
  194. if results and not isinstance(results, Iterable):
  195. raise Exception("%s.%s返回值必须可迭代" % (parser.name, "start_requests"))
  196. result_type = 1
  197. for result in results or []: # step 对yield 的数据进行判断处理
  198. if isinstance(result, Request): # Request 加入到任务队列
  199. result.parser_name = result.parser_name or parser.name
  200. self._request_buffer.put_request(result)
  201. result_type = 1
  202. elif isinstance(result, Item): # Item 数据,存入到数据管道队列,等待存储
  203. self._item_buffer.put_item(result)
  204. result_type = 2
  205. elif callable(result): # callable 的 request 可能是更新数据库操作的函数
  206. if result_type == 1:
  207. self._request_buffer.put_request(result)
  208. else:
  209. self._item_buffer.put_item(result)
  210. else:
  211. raise TypeError(
  212. "start_requests yield result type error, expect Request、Item、callback func, bug get type: {}".format(
  213. type(result)
  214. )
  215. )
  216. self._request_buffer.flush()
  217. self._item_buffer.flush()
  218. def all_thread_is_done(self):
  219. # Stress 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性
  220. for i in range(3):
  221. # STEP 5.1 检测 collector 状态
  222. if (
  223. self._collector.is_collector_task()
  224. or self._collector.get_requests_count() > 0
  225. ):
  226. return False
  227. # STEP 5.2 检测 parser_control 状态
  228. for parser_control in self._parser_controls:
  229. if not parser_control.is_not_task():
  230. return False
  231. # STEP 5.3 检测 item_buffer 状态
  232. if (
  233. self._item_buffer.get_items_count() > 0
  234. or self._item_buffer.is_adding_to_db()
  235. ):
  236. return False
  237. # STEP 5.4 检测 request_buffer 状态
  238. if (
  239. self._request_buffer.get_requests_count() > 0
  240. or self._request_buffer.is_adding_to_db()
  241. ):
  242. return False
  243. # STEP 5.5 检测 task_buffer 状态
  244. if (
  245. self._task_buffer.get_tasks_count() > 0
  246. or self._task_buffer.is_adding_to_db()
  247. ):
  248. return False
  249. tools.delay_time(1) # 休眠1秒
  250. return True
  251. @tools.run_safe_model("check_task_status")
  252. def check_task_status(self):
  253. """
  254. 检查任务状态 预警
  255. """
  256. # step 每分钟检查一次
  257. now_time = time.time()
  258. if now_time - self._last_check_task_status_time > 60:
  259. self._last_check_task_status_time = now_time
  260. else:
  261. return
  262. # 检查失败任务数量 超过1000 报警
  263. failed_count = self._request_buffer.get_failed_requests_count()
  264. log.debug(f'《{self._spider_name}》爬虫失败任务数量:{failed_count}')
  265. if failed_count > setting.WARNING_FAILED_COUNT:
  266. # 发送报警
  267. msg = "《%s》爬虫当前失败任务 %s, 请检查爬虫是否正常" % (self._spider_name, failed_count)
  268. log.error(msg)
  269. self.send_msg(
  270. msg,
  271. level="error",
  272. message_prefix="《%s》爬虫当前失败任务数报警" % (self._spider_name),
  273. )
  274. # parser_control 实时统计已做任务数及失败任务数,若成功率<0.5 则报警
  275. failed_task_count, success_task_count = PaserControl.get_task_status_count()
  276. total_count = success_task_count + failed_task_count
  277. if total_count > 0:
  278. task_success_rate = success_task_count / total_count
  279. if task_success_rate < 0.5:
  280. # 发送报警
  281. msg = "《%s》爬虫当前任务成功数%s, 失败数%s, 成功率 %.2f, 请检查爬虫是否正常" % (
  282. self._spider_name,
  283. success_task_count,
  284. failed_task_count,
  285. task_success_rate,
  286. )
  287. log.error(msg)
  288. self.send_msg(
  289. msg,
  290. level="error",
  291. message_prefix="《%s》爬虫当前任务成功率报警" % (self._spider_name),
  292. )
  293. # 检查入库失败次数
  294. if self._item_buffer.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
  295. msg = "《{}》爬虫导出数据失败,失败次数:{}, 请检查爬虫是否正常".format(
  296. self._spider_name, self._item_buffer.export_falied_times
  297. )
  298. log.error(msg)
  299. self.send_msg(
  300. msg,
  301. level="error",
  302. message_prefix="《%s》爬虫导出数据失败" % (self._spider_name)
  303. )
  304. def _stop_all_thread(self):
  305. # 关闭任务管理器
  306. self._request_buffer.stop()
  307. # 关闭数据管道
  308. self._item_buffer.stop()
  309. # 关闭任务管理
  310. self._collector.stop()
  311. # 关闭采集状态管理器
  312. self._task_buffer.stop()
  313. # 停止 parser_controls
  314. for parser_control in self._parser_controls:
  315. parser_control.stop()
  316. # 记录爬虫停止时间
  317. self.report_node_heartbeat('close')
  318. self._started.clear()
  319. def send_msg(self, msg, level="debug", message_prefix=""):
  320. # log.debug("发送报警 level:{} msg{}".format(level, msg))
  321. tools.send_msg(msg=msg, level=level, message_prefix=message_prefix)
  322. def get_argvs(self):
  323. argvs = {"next_page": False, "max_page": 10}
  324. for item in sys.argv[1:]:
  325. # print(item)
  326. if item.startswith("--"):
  327. key = item.replace("--", "").split('=')[0]
  328. val = item.split('=')[-1]
  329. if key != 'purpose':
  330. argvs[key] = eval(val) # 此处使用eval的原因是字符串转bool或int
  331. return json.loads(json.dumps(argvs), object_hook=lambda d: SimpleNamespace(**d))
  332. def spider_begin(self):
  333. """
  334. @summary: start_monitor_task 方式启动,此函数与spider_end不在同一进程内,变量不可共享
  335. ---------
  336. ---------
  337. @result:
  338. """
  339. if self._begin_callback:
  340. self._begin_callback()
  341. parameter = self.get_argvs()
  342. for parser in self._parsers:
  343. parser.platform_next_page = parameter.next_page
  344. parser.platform_max_page = parameter.max_page
  345. parser.start_callback()
  346. # 记录爬虫开始时间
  347. self.report_node_heartbeat('start')
  348. def spider_end(self):
  349. # 爬虫结束时间
  350. self.report_node_heartbeat('end')
  351. if self._end_callback: # 任务结束回调
  352. self._end_callback()
  353. for parser in self._parsers:
  354. if not self._keep_alive:
  355. parser.close() # 爬虫可自定义close
  356. parser.end_callback() # 调用结束回调函数
  357. if not self._keep_alive:
  358. # 关闭 webdriver 管理池
  359. if Request.webdriver_pool:
  360. Request.webdriver_pool.close()
  361. # 关闭打点
  362. metrics.close()
  363. else:
  364. metrics.flush()
  365. if self._keep_alive:
  366. log.info("爬虫不自动结束,等待下一轮任务...")
  367. else:
  368. log.info("《%s》爬虫结束" % (self._spider_name,))
  369. def report_node_heartbeat(self, status):
  370. """
  371. 爬虫心跳
  372. """
  373. message = {
  374. 'ip': tools.get_localhost_ip(),
  375. 'spider_id': self._spider_id,
  376. 'spider_name': self._spider_name,
  377. 'ts': tools.get_current_timestamp(),
  378. 'status': status
  379. }
  380. self._rabbitmq.add(self._tab_spider_heartbeat, message)
  381. def join(self, timeout=None):
  382. """
  383. 重写线程的join
  384. """
  385. if not self._started.is_set():
  386. return
  387. super().join()