scheduler.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2017-01-09 10:38
  4. ---------
  5. @summary: 组装parser、 parser_control 和 collector
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. import json
  11. import sys
  12. import threading
  13. import time
  14. from collections import Iterable
  15. from types import SimpleNamespace
  16. import feapder.setting as setting
  17. import feapder.utils.tools as tools
  18. from feapder.buffer.item_buffer import ItemBuffer
  19. from feapder.buffer.request_buffer import RequestBuffer
  20. from feapder.core.base_parser import BaseParser
  21. from feapder.core.collector import Collector
  22. from feapder.core.handle_failed_items import HandleFailedItems
  23. from feapder.core.handle_failed_requests import HandleFailedRequests
  24. from feapder.core.parser_control import PaserControl
  25. from feapder.db.rabbitMq import RabbitMQ
  26. from feapder.network.item import Item
  27. from feapder.network.request import Request
  28. from feapder.utils import metrics
  29. from feapder.utils.log import log
  30. class Scheduler(threading.Thread):
  31. __custom_setting__ = {}
  32. def __init__(
  33. self,
  34. redis_key=None,
  35. thread_count=None,
  36. begin_callback=None,
  37. end_callback=None,
  38. keep_alive=None,
  39. auto_start_requests=None,
  40. **kwargs
  41. ):
  42. """
  43. @summary: 调度器
  44. ---------
  45. @param redis_key: 爬虫request及item存放redis中的文件夹
  46. @param thread_count: 线程数,默认为配置文件中的线程数
  47. @param begin_callback: 爬虫开始回调函数
  48. @param end_callback: 爬虫结束回调函数
  49. @param keep_alive: 爬虫是否常驻,默认否
  50. @param auto_start_requests: 爬虫是否自动添加任务
  51. ---------
  52. @result:
  53. """
  54. super(Scheduler, self).__init__()
  55. for key, value in self.__class__.__custom_setting__.items():
  56. if key == "AUTO_STOP_WHEN_SPIDER_DONE": # 兼容老版本的配置
  57. setattr(setting, "KEEP_ALIVE", not value)
  58. else:
  59. setattr(setting, key, value)
  60. self._redis_key = redis_key or setting.REDIS_KEY
  61. if not self._redis_key:
  62. raise Exception(
  63. """
  64. redis_key 为redis中存放request与item的目录。不能为空,
  65. 可在setting中配置,如 REDIS_KEY = 'test'
  66. 或spider初始化时传参, 如 TestSpider(redis_key='test')
  67. """
  68. )
  69. self._rabbitmq = RabbitMQ()
  70. self._request_buffer = RequestBuffer(redis_key)
  71. self._item_buffer = ItemBuffer(redis_key)
  72. self._collector = Collector(redis_key)
  73. self._parsers = []
  74. self._parser_controls = []
  75. self._parser_control_obj = PaserControl
  76. # 兼容老版本的参数
  77. if "auto_stop_when_spider_done" in kwargs:
  78. self._keep_alive = not kwargs.get("auto_stop_when_spider_done")
  79. else:
  80. self._keep_alive = (
  81. keep_alive if keep_alive is not None else setting.KEEP_ALIVE
  82. )
  83. self._auto_start_requests = (
  84. auto_start_requests
  85. if auto_start_requests is not None
  86. else setting.SPIDER_AUTO_START_REQUESTS
  87. )
  88. self._begin_callback = (
  89. begin_callback
  90. if begin_callback
  91. else lambda: log.info("\n********** feapder begin **********")
  92. )
  93. self._end_callback = (
  94. end_callback
  95. if end_callback
  96. else lambda: log.info("\n********** feapder end **********")
  97. )
  98. self._thread_count = (
  99. setting.SPIDER_THREAD_COUNT if not thread_count else thread_count
  100. )
  101. self._spider_id = tools.get_uuid(redis_key, tools.get_current_date())
  102. self._spider_name = redis_key
  103. # 声明爬虫心跳队列
  104. self._tab_spider_heartbeat = setting.SPIDER_HEARTBEAT
  105. self._rabbitmq.declare(queue=self._tab_spider_heartbeat)
  106. self._is_notify_end = False # 是否已经通知结束
  107. # Request 缓存设置
  108. Request.cached_redis_key = redis_key
  109. Request.cached_expire_time = setting.RESPONSE_CACHED_EXPIRE_TIME
  110. self._last_check_task_status_time = 0
  111. self.init_metrics()
  112. def init_metrics(self):
  113. """
  114. 初始化打点系统
  115. """
  116. metrics.init(**setting.METRICS_OTHER_ARGS)
  117. def add_parser(self, parser):
  118. parser = parser() # parser 实例化
  119. if isinstance(parser, BaseParser):
  120. self._parsers.append(parser)
  121. else:
  122. raise ValueError("类型错误,爬虫需继承feapder.BaseParser或feapder.BatchParser")
  123. def run(self):
  124. self._start()
  125. while True:
  126. self.__report_node_heartbeat('running')
  127. try:
  128. if self.all_thread_is_done():
  129. if not self._is_notify_end:
  130. self.spider_end() # 爬虫运行结束
  131. self._is_notify_end = True
  132. if not self._keep_alive: # 如果不是常驻爬虫 关闭所有线程
  133. self._stop_all_thread()
  134. break
  135. else:
  136. self._is_notify_end = False
  137. self.check_task_status()
  138. except Exception as e:
  139. log.exception(e)
  140. tools.delay_time(1)
  141. def __add_task(self):
  142. self.spider_begin() # 启动爬虫 start_requests
  143. # 判断任务池中属否还有任务,若有接着抓取,若无则生产新任务
  144. todo_task_count = self._collector.get_requests_count()
  145. if todo_task_count:
  146. log.info("检查到有待做任务 %s 条,不重下发新任务,将接着上回异常终止处继续抓取" % todo_task_count)
  147. else:
  148. for parser in self._parsers:
  149. results = parser.start_requests()
  150. # 添加request到请求队列,由请求队列统一入库
  151. if results and not isinstance(results, Iterable):
  152. raise Exception("%s.%s返回值必须可迭代" % (parser.name, "start_requests"))
  153. result_type = 1
  154. for result in results or []: # step 对yield 的数据进行判断处理
  155. if isinstance(result, Request): # Request 加入到任务队列
  156. result.parser_name = result.parser_name or parser.name
  157. self._request_buffer.put_request(result)
  158. result_type = 1
  159. elif isinstance(result, Item): # Item 数据,存入到数据管道队列,等待存储
  160. self._item_buffer.put_item(result)
  161. result_type = 2
  162. elif callable(result): # callable 的 request 可能是更新数据库操作的函数
  163. if result_type == 1:
  164. self._request_buffer.put_request(result)
  165. else:
  166. self._item_buffer.put_item(result)
  167. else:
  168. raise TypeError(
  169. "start_requests yield result type error, expect Request、Item、callback func, bug get type: {}".format(
  170. type(result)
  171. )
  172. )
  173. self._request_buffer.flush()
  174. self._item_buffer.flush()
  175. def _start(self):
  176. # 将失败的item入库
  177. if setting.RETRY_FAILED_ITEMS:
  178. handle_failed_items = HandleFailedItems(
  179. redis_key=self._redis_key,
  180. item_buffer=self._item_buffer,
  181. rabbitmq=self._rabbitmq,
  182. )
  183. handle_failed_items.reput_failed_items_to_db()
  184. # STEP 3.1 开启 request_buffer -- 任务管理器,负责缓冲添加到数据库中的request
  185. self._request_buffer.start()
  186. # STEP 3.2 开启 item_buffer -- 管道管理器 负责缓冲采集的数据添加到数据库
  187. self._item_buffer.start()
  188. # STEP 3.3 开启 collector -- 任务管理 分发任务
  189. self._collector.start()
  190. # 启动parser control
  191. for i in range(self._thread_count):
  192. # STEP 3.4 创建执行任务线程池
  193. parser_control = self._parser_control_obj(
  194. self._collector,
  195. self._redis_key,
  196. self._request_buffer,
  197. self._item_buffer,
  198. )
  199. for parser in self._parsers: # step 3.5 把所有待执行任务添加到线程池
  200. parser_control.add_parser(parser)
  201. parser_control.start() # STEP 3.6 开启采集线程
  202. self._parser_controls.append(parser_control)
  203. # STEP 3.7下发任务 有消费线程之后开始读取任务
  204. if setting.RETRY_FAILED_REQUESTS:
  205. # 重设失败的任务
  206. handle_failed_requests = HandleFailedRequests(
  207. redis_key=self._redis_key,
  208. rabbitmq=self._rabbitmq
  209. )
  210. handle_failed_requests.reput_failed_requests_to_requests()
  211. # STEP 3.8下发新任务 ,生产新任务
  212. if self._auto_start_requests:
  213. self.__add_task()
  214. def all_thread_is_done(self):
  215. # Stress 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性
  216. for i in range(3):
  217. # STEP 5.1 检测 collector 状态
  218. if (
  219. self._collector.is_collector_task()
  220. or self._collector.get_requests_count() > 0
  221. ):
  222. return False
  223. # STEP 5.2 检测 parser_control 状态
  224. for parser_control in self._parser_controls:
  225. if not parser_control.is_not_task():
  226. return False
  227. # STEP 5.3 检测 item_buffer 状态
  228. if (
  229. self._item_buffer.get_items_count() > 0
  230. or self._item_buffer.is_adding_to_db()
  231. ):
  232. return False
  233. # STEP 5.4 检测 request_buffer 状态
  234. if (
  235. self._request_buffer.get_requests_count() > 0
  236. or self._request_buffer.is_adding_to_db()
  237. ):
  238. return False
  239. tools.delay_time(1) # 休眠1秒
  240. return True
  241. @tools.run_safe_model("check_task_status")
  242. def check_task_status(self):
  243. """
  244. 检查任务状态 预警
  245. """
  246. # step 每分钟检查一次
  247. now_time = time.time()
  248. if now_time - self._last_check_task_status_time > 60:
  249. self._last_check_task_status_time = now_time
  250. else:
  251. return
  252. # 检查失败任务数量 超过1000 报警,
  253. failed_count = self._request_buffer.get_failed_requests_count()
  254. log.debug(f'《{self._spider_name}》爬虫失败任务数量:{failed_count}')
  255. if failed_count > setting.WARNING_FAILED_COUNT:
  256. # 发送报警
  257. msg = "《%s》爬虫当前失败任务 %s, 请检查爬虫是否正常" % (self._spider_name, failed_count)
  258. log.error(msg)
  259. tools.send_msg(**dict(
  260. msg=msg,
  261. level="error",
  262. message_prefix="《%s》爬虫当前失败任务数报警" % (self._spider_name),
  263. ))
  264. # parser_control 实时统计已做任务数及失败任务数,若成功率<0.5 则报警
  265. failed_task_count, success_task_count = PaserControl.get_task_status_count()
  266. total_count = success_task_count + failed_task_count
  267. if total_count > 0:
  268. task_success_rate = success_task_count / total_count
  269. if task_success_rate < 0.5:
  270. # 发送报警
  271. msg = "《%s》爬虫当前任务成功数%s, 失败数%s, 成功率 %.2f, 请检查爬虫是否正常" % (
  272. self._spider_name,
  273. success_task_count,
  274. failed_task_count,
  275. task_success_rate,
  276. )
  277. log.error(msg)
  278. tools.send_msg(**dict(
  279. msg=msg,
  280. level="error",
  281. message_prefix="《%s》爬虫当前任务成功率报警" % (self._spider_name),
  282. ))
  283. # 检查入库失败次数
  284. if self._item_buffer.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
  285. msg = "《{}》爬虫导出数据失败,失败次数:{}, 请检查爬虫是否正常".format(
  286. self._spider_name, self._item_buffer.export_falied_times
  287. )
  288. log.error(msg)
  289. tools.send_msg(**dict(
  290. msg=msg,
  291. level="error",
  292. message_prefix="《%s》爬虫导出数据失败" % (self._spider_name)
  293. ))
  294. def _stop_all_thread(self):
  295. # 关闭任务管理器
  296. self._request_buffer.stop()
  297. # 关闭数据管道
  298. self._item_buffer.stop()
  299. # 关闭任务管理
  300. self._collector.stop()
  301. # 停止 parser_controls
  302. for parser_control in self._parser_controls:
  303. parser_control.stop()
  304. # 关闭rabbitmq
  305. self._rabbitmq.close()
  306. # 记录爬虫停止时间
  307. self.__report_node_heartbeat('close')
  308. self._started.clear()
  309. def get_argvs(self):
  310. argvs = {"next_page": False, "max_page": 10}
  311. for item in sys.argv[1:]:
  312. # print(item)
  313. if item.startswith("--"):
  314. key = item.replace("--", "").split('=')[0]
  315. val = item.split('=')[-1]
  316. if key != 'purpose':
  317. argvs[key] = eval(val) # 此处使用eval的原因是字符串转bool或int
  318. return json.loads(json.dumps(argvs), object_hook=lambda d: SimpleNamespace(**d))
  319. def spider_begin(self):
  320. """
  321. @summary: start_monitor_task 方式启动,此函数与spider_end不在同一进程内,变量不可共享
  322. ---------
  323. ---------
  324. @result:
  325. """
  326. if self._begin_callback:
  327. self._begin_callback()
  328. for parser in self._parsers:
  329. parameter = self.get_argvs()
  330. parser.platform_next_page = parameter.next_page
  331. parser.platform_max_page = parameter.max_page
  332. parser.start_callback()
  333. # 记录爬虫开始时间
  334. self.__report_node_heartbeat('start')
  335. def spider_end(self):
  336. # 爬虫结束时间
  337. self.__report_node_heartbeat('end')
  338. if self._end_callback: # 任务结束回调
  339. self._end_callback()
  340. for parser in self._parsers:
  341. if not self._keep_alive:
  342. parser.close() # 爬虫可自定义close
  343. parser.end_callback() # 调用结束回调函数
  344. if not self._keep_alive:
  345. # 关闭 webdriver 管理池
  346. if Request.webdriver_pool:
  347. Request.webdriver_pool.close()
  348. # 关闭打点
  349. metrics.close()
  350. else:
  351. metrics.flush()
  352. if self._keep_alive:
  353. log.info("爬虫不自动结束,等待下一轮任务...")
  354. else:
  355. log.info("《%s》爬虫结束" % (self._spider_name))
  356. def __report_node_heartbeat(self, status):
  357. """
  358. 爬虫心跳
  359. """
  360. message = {
  361. 'ip': tools.get_localhost_ip(),
  362. 'spider_id': self._spider_id,
  363. 'spider_name': self._spider_name,
  364. 'ts': tools.get_current_timestamp(),
  365. 'status': status
  366. }
  367. self._rabbitmq.add(self._tab_spider_heartbeat, message)
  368. def join(self, timeout=None):
  369. """
  370. 重写线程的join
  371. """
  372. if not self._started.is_set():
  373. return
  374. super().join()