scheduler.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2017-01-09 10:38
  4. ---------
  5. @summary: 组装parser、 parser_control 和 collector
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. import json
  11. import sys
  12. import threading
  13. import time
  14. from collections import Iterable
  15. import feapder.setting as setting
  16. import feapder.utils.tools as tools
  17. from feapder.buffer.item_buffer import ItemBuffer
  18. from feapder.buffer.request_buffer import RequestBuffer
  19. from feapder.core.base_parser import BaseParser
  20. from feapder.core.collector import Collector
  21. from feapder.core.handle_failed_items import HandleFailedItems
  22. from feapder.core.handle_failed_requests import HandleFailedRequests
  23. from feapder.core.parser_control import PaserControl
  24. from feapder.db.redisdb import RedisDB
  25. from feapder.network.item import Item
  26. from feapder.network.request import Request
  27. from feapder.utils import metrics
  28. from feapder.utils.log import log
  29. from feapder.utils.redis_lock import RedisLock
  30. SPIDER_UUID = tools.get_uuid()
  31. SPIDER_START_TIME = "spider_start_time"
  32. SPIDER_START_TIME_KEY = SPIDER_START_TIME + "#" + SPIDER_UUID
  33. SPIDER_END_TIME_KEY = "spider_end_time"
  34. SPIDER_LAST_TASK_COUNT_RECORD_TIME_KEY = "last_task_count_record_time"
  35. class Obj(object):
  36. def __init__(self, dict_):
  37. self.__dict__.update(dict_)
  38. class Scheduler(threading.Thread):
  39. __custom_setting__ = {}
  40. def __init__(
  41. self,
  42. redis_key=None,
  43. thread_count=None,
  44. begin_callback=None,
  45. end_callback=None,
  46. delete_keys=(),
  47. keep_alive=None,
  48. auto_start_requests=None,
  49. batch_interval=0,
  50. wait_lock=True,
  51. task_table=None,
  52. **kwargs
  53. ):
  54. """
  55. @summary: 调度器
  56. ---------
  57. @param redis_key: 爬虫request及item存放redis中的文件夹
  58. @param thread_count: 线程数,默认为配置文件中的线程数
  59. @param begin_callback: 爬虫开始回调函数
  60. @param end_callback: 爬虫结束回调函数
  61. @param delete_keys: 爬虫启动时删除的key,类型: 元组/bool/string。 支持正则
  62. @param keep_alive: 爬虫是否常驻,默认否
  63. @param auto_start_requests: 爬虫是否自动添加任务
  64. @param batch_interval: 抓取时间间隔 默认为0 天为单位 多次启动时,只有当前时间与第一次抓取结束的时间间隔大于指定的时间间隔时,爬虫才启动
  65. @param wait_lock: 下发任务时否等待锁,若不等待锁,可能会存在多进程同时在下发一样的任务,因此分布式环境下请将该值设置True
  66. @param task_table: 任务表, 批次爬虫传递
  67. ---------
  68. @result:
  69. """
  70. super(Scheduler, self).__init__()
  71. for key, value in self.__class__.__custom_setting__.items():
  72. if key == "AUTO_STOP_WHEN_SPIDER_DONE": # 兼容老版本的配置
  73. setattr(setting, "KEEP_ALIVE", not value)
  74. else:
  75. setattr(setting, key, value)
  76. # 历史爬虫[redis_key]
  77. for item in sys.argv[1:]:
  78. if item.startswith("--purpose"):
  79. val = item.split('=')[-1]
  80. if not redis_key.endswith(val):
  81. # 历史爬虫需要单独的redis_key,防止增量爬虫
  82. # 与历史爬虫共用同一个redis_key,出现增量爬虫断点续采的情况
  83. redis_key += f'_{val}'
  84. self._redis_key = redis_key or setting.REDIS_KEY
  85. if not self._redis_key:
  86. raise Exception(
  87. """
  88. redis_key 为redis中存放request与item的目录。不能为空,
  89. 可在setting中配置,如 REDIS_KEY = 'test'
  90. 或spider初始化时传参, 如 TestSpider(redis_key='test')
  91. """
  92. )
  93. self._request_buffer = RequestBuffer(redis_key)
  94. self._item_buffer = ItemBuffer(redis_key, task_table)
  95. self._collector = Collector(redis_key)
  96. self._parsers = []
  97. self._parser_controls = []
  98. self._parser_control_obj = PaserControl
  99. # 兼容老版本的参数
  100. if "auto_stop_when_spider_done" in kwargs:
  101. self._keep_alive = not kwargs.get("auto_stop_when_spider_done")
  102. else:
  103. self._keep_alive = (
  104. keep_alive if keep_alive is not None else setting.KEEP_ALIVE
  105. )
  106. self._auto_start_requests = (
  107. auto_start_requests
  108. if auto_start_requests is not None
  109. else setting.SPIDER_AUTO_START_REQUESTS
  110. )
  111. self._batch_interval = batch_interval
  112. self._begin_callback = (
  113. begin_callback
  114. if begin_callback
  115. else lambda: log.info("\n********** feapder begin **********")
  116. )
  117. self._end_callback = (
  118. end_callback
  119. if end_callback
  120. else lambda: log.info("\n********** feapder end **********")
  121. )
  122. self._thread_count = (
  123. setting.SPIDER_THREAD_COUNT if not thread_count else thread_count
  124. )
  125. self._spider_name = redis_key
  126. self._project_name = redis_key.split(":")[0]
  127. self._task_table = task_table
  128. self._tab_spider_time = setting.TAB_SPIDER_TIME.format(redis_key=redis_key)
  129. self._tab_spider_status = setting.TAB_SPIDER_STATUS.format(redis_key=redis_key)
  130. self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
  131. self._tab_failed_requests = setting.TAB_FAILED_REQUESTS.format(
  132. redis_key=redis_key
  133. )
  134. self._is_notify_end = False # 是否已经通知结束
  135. self._last_task_count = 0 # 最近一次任务数量
  136. self._redisdb = RedisDB()
  137. self._project_total_state_table = "{}_total_state".format(self._project_name)
  138. self._is_exist_project_total_state_table = False
  139. # Request 缓存设置
  140. Request.cached_redis_key = redis_key
  141. Request.cached_expire_time = setting.RESPONSE_CACHED_EXPIRE_TIME
  142. delete_keys = delete_keys or setting.DELETE_KEYS
  143. if delete_keys:
  144. self.delete_tables(delete_keys)
  145. self._last_check_task_status_time = 0
  146. self.wait_lock = wait_lock
  147. self.init_metrics()
  148. def init_metrics(self):
  149. """
  150. 初始化打点系统
  151. """
  152. metrics.init(**setting.METRICS_OTHER_ARGS)
  153. def add_parser(self, parser):
  154. parser = parser() # parser 实例化
  155. if isinstance(parser, BaseParser):
  156. self._parsers.append(parser)
  157. else:
  158. raise ValueError("类型错误,爬虫需继承feapder.BaseParser或feapder.BatchParser")
  159. def run(self): # STEP 1 爬虫框架入口
  160. if not self.is_reach_next_spider_time(): # STEP 2 检测爬虫是否到达执行时间
  161. return
  162. self._start() # STEP 3 开始运行爬虫
  163. while True: # step 4 对爬虫状态的一个监控
  164. try:
  165. if self.all_thread_is_done(): # Step 5 判断爬虫是否运行完成
  166. if not self._is_notify_end:
  167. self.spider_end() # 跑完一轮
  168. self._is_notify_end = True
  169. if not self._keep_alive: # step 7 如果不是常驻爬虫 停止所有线程
  170. self._stop_all_thread()
  171. break
  172. else:
  173. self._is_notify_end = False
  174. self.check_task_status() # step 8 检查任务状态,并进行告警通知
  175. except Exception as e:
  176. log.exception(e)
  177. tools.delay_time(1) # 1秒钟检查一次爬虫状态
  178. def __add_task(self):
  179. # 启动parser 的 start_requests
  180. self.spider_begin() # 不自动结束的爬虫此处只能执行一遍
  181. # 判断任务池中属否还有任务,若有接着抓取,若无则生产新任务
  182. todo_task_count = self._collector.get_requests_count()
  183. if todo_task_count:
  184. log.info("检查到有待做任务 %s 条,不重下发新任务,将接着上回异常终止处继续抓取" % todo_task_count)
  185. else:
  186. for parser in self._parsers:
  187. results = parser.start_requests()
  188. # 添加request到请求队列,由请求队列统一入库
  189. if results and not isinstance(results, Iterable):
  190. raise Exception("%s.%s返回值必须可迭代" % (parser.name, "start_requests"))
  191. result_type = 1
  192. for result in results or []: # step 对yield 的数据进行判断处理
  193. if isinstance(result, Request): # Request 加入到任务队列
  194. result.parser_name = result.parser_name or parser.name
  195. self._request_buffer.put_request(result)
  196. result_type = 1
  197. elif isinstance(result, Item): # Item 数据,存入到数据管道队列,等待存储
  198. self._item_buffer.put_item(result)
  199. result_type = 2
  200. elif callable(result): # callable 的 request 可能是更新数据库操作的函数
  201. if result_type == 1:
  202. self._request_buffer.put_request(result)
  203. else:
  204. self._item_buffer.put_item(result)
  205. else:
  206. raise TypeError(
  207. "start_requests yield result type error, expect Request、Item、callback func, bug get type: {}".format(
  208. type(result)
  209. )
  210. )
  211. self._request_buffer.flush()
  212. self._item_buffer.flush()
  213. def _start(self):
  214. # 将失败的item入库
  215. if setting.RETRY_FAILED_ITEMS:
  216. handle_failed_items = HandleFailedItems(
  217. redis_key=self._redis_key,
  218. task_table=self._task_table,
  219. item_buffer=self._item_buffer,
  220. )
  221. handle_failed_items.reput_failed_items_to_db()
  222. # STEP 3.1 启动request_buffer -- 任务管理器, 负责缓冲添加到数据库中的request
  223. self._request_buffer.start()
  224. # STEP 3.2 启动item_buffer -- 管道管理器 责缓冲添加到数据库中的item, 由该manager统一添加。防止多线程同时访问数据库
  225. self._item_buffer.start()
  226. # STEP 3.3 启动collector -- 任务管理 ,根据节点和任务,平均分配给每个节点
  227. self._collector.start()
  228. # 启动parser control
  229. for i in range(self._thread_count):
  230. # STEP 3.4 根据 任务管理器、redis_key,下载器,数据管道创建一个线程池
  231. parser_control = self._parser_control_obj(
  232. self._collector,
  233. self._redis_key,
  234. self._request_buffer,
  235. self._item_buffer,
  236. )
  237. for parser in self._parsers: # step 3.5 把所有任务放入线程池
  238. parser_control.add_parser(parser)
  239. parser_control.start() # STEP 3.6 根据线程池开辟一个线程
  240. self._parser_controls.append(parser_control)
  241. # STEP 3.7下发任务 有消费线程之后开始读取任务
  242. if setting.RETRY_FAILED_REQUESTS:
  243. # 重设失败的任务, 不用加锁,原子性操作
  244. handle_failed_requests = HandleFailedRequests(self._redis_key)
  245. handle_failed_requests.reput_failed_requests_to_requests()
  246. # STEP 3.8下发新任务 ,生产新任务
  247. if self._auto_start_requests: # 自动下发
  248. if self.wait_lock:
  249. # Stress 将添加任务处加锁,防止多进程之间添加重复的任务
  250. with RedisLock(key=self._spider_name) as lock:
  251. if lock.locked:
  252. self.__add_task()
  253. else:
  254. self.__add_task()
  255. def all_thread_is_done(self):
  256. # Stress 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性
  257. for i in range(3):
  258. # STEP 5.1 检测 collector 状态
  259. if (
  260. self._collector.is_collector_task()
  261. or self._collector.get_requests_count() > 0
  262. ):
  263. return False
  264. # STEP 5.2 检测 parser_control 状态
  265. for parser_control in self._parser_controls:
  266. if not parser_control.is_not_task():
  267. return False
  268. # STEP 5.3 检测 item_buffer 状态
  269. if (
  270. self._item_buffer.get_items_count() > 0
  271. or self._item_buffer.is_adding_to_db()
  272. ):
  273. return False
  274. # STEP 5.4 检测 request_buffer 状态
  275. if (
  276. self._request_buffer.get_requests_count() > 0
  277. or self._request_buffer.is_adding_to_db()
  278. ):
  279. return False
  280. tools.delay_time(1) # 休眠1秒
  281. return True
  282. @tools.run_safe_model("check_task_status")
  283. def check_task_status(self):
  284. """
  285. 检查任务状态 预警
  286. """
  287. # step 每分钟检查一次
  288. now_time = time.time()
  289. if now_time - self._last_check_task_status_time > 60:
  290. self._last_check_task_status_time = now_time
  291. else:
  292. return
  293. # 检查失败任务数量 超过1000 报警,
  294. failed_count = self._redisdb.zget_count(self._tab_failed_requests)
  295. print('<<<<<<<<<<<<<<<<<<<<<<<<<<<< 失败次数:', failed_count)
  296. if failed_count > setting.WARNING_FAILED_COUNT:
  297. # 发送报警
  298. msg = "《%s》爬虫当前失败任务 %s, 请检查爬虫是否正常" % (self._spider_name, failed_count)
  299. log.error(msg)
  300. self.send_msg(
  301. msg,
  302. level="error",
  303. message_prefix="《%s》爬虫当前失败任务数报警" % (self._spider_name),
  304. )
  305. # parser_control实时统计已做任务数及失败任务数,若成功率<0.5 则报警
  306. failed_task_count, success_task_count = PaserControl.get_task_status_count()
  307. total_count = success_task_count + failed_task_count
  308. if total_count > 0:
  309. task_success_rate = success_task_count / total_count
  310. if task_success_rate < 0.5:
  311. # 发送报警
  312. msg = "《%s》爬虫当前任务成功数%s, 失败数%s, 成功率 %.2f, 请检查爬虫是否正常" % (
  313. self._spider_name,
  314. success_task_count,
  315. failed_task_count,
  316. task_success_rate,
  317. )
  318. log.error(msg)
  319. self.send_msg(
  320. msg,
  321. level="error",
  322. message_prefix="《%s》爬虫当前任务成功率报警" % (self._spider_name),
  323. )
  324. # 判断任务数是否变化
  325. # step 检查redis中任务状态,若连续20分钟内任务数量未发生变化(parser可能卡死),则发出报警信息
  326. task_count = self._redisdb.zget_count(self._tab_requests)
  327. if task_count:
  328. if task_count != self._last_task_count:
  329. self._last_task_count = task_count
  330. self._redisdb.hset(
  331. self._tab_spider_time,
  332. SPIDER_LAST_TASK_COUNT_RECORD_TIME_KEY,
  333. tools.get_current_timestamp(),
  334. ) # 多进程会重复发消息, 使用redis记录上次统计时间
  335. else:
  336. # step 判断时间间隔是否超过20分钟
  337. lua = """
  338. -- local key = KEYS[1]
  339. local field = ARGV[1]
  340. local current_timestamp = ARGV[2]
  341. -- 取值
  342. local last_timestamp = redis.call('hget', KEYS[1], field)
  343. if last_timestamp and current_timestamp - last_timestamp >= 1200 then
  344. -- 返回任务停滞时间 秒
  345. return current_timestamp - last_timestamp
  346. end
  347. if not last_timestamp then
  348. redis.call('hset', KEYS[1], field, current_timestamp)
  349. end
  350. return 0
  351. """
  352. redis_obj = self._redisdb.get_redis_obj()
  353. cmd = redis_obj.register_script(lua)
  354. overtime = cmd(
  355. keys=[self._tab_spider_time],
  356. args=[
  357. SPIDER_LAST_TASK_COUNT_RECORD_TIME_KEY,
  358. tools.get_current_timestamp(),
  359. ],
  360. )
  361. if overtime:
  362. # step 记录日志,并发送报警
  363. msg = "{} 爬虫任务停滞 {},请检查爬虫是否正常".format(
  364. self._spider_name, tools.format_seconds(overtime)
  365. )
  366. log.error(msg) # TODO 这一步可以加一个print,在平台的日志框里输出
  367. self.send_msg(
  368. msg,
  369. level="error",
  370. message_prefix="《{}》爬虫任务停滞".format(self._spider_name),
  371. )
  372. else:
  373. self._last_task_count = 0
  374. # 检查入库失败次数
  375. if self._item_buffer.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES:
  376. msg = "《{}》爬虫导出数据失败,失败次数:{}, 请检查爬虫是否正常".format(
  377. self._spider_name, self._item_buffer.export_falied_times
  378. )
  379. log.error(msg)
  380. self.send_msg(
  381. msg, level="error", message_prefix="《%s》爬虫导出数据失败" % (self._spider_name)
  382. )
  383. def delete_tables(self, delete_tables_list):
  384. if isinstance(delete_tables_list, bool):
  385. delete_tables_list = [self._redis_key + "*"]
  386. elif not isinstance(delete_tables_list, (list, tuple)):
  387. delete_tables_list = [delete_tables_list]
  388. redis = RedisDB()
  389. for delete_tab in delete_tables_list:
  390. if not delete_tab.startswith(self._redis_key):
  391. delete_tab = self._redis_key + delete_tab
  392. tables = redis.getkeys(delete_tab)
  393. for table in tables:
  394. if table != self._tab_spider_time:
  395. log.info("正在删除key %s" % table)
  396. redis.clear(table)
  397. else:
  398. keys = redis.hgetall(table)
  399. for key in keys:
  400. if key.startswith(SPIDER_START_TIME):
  401. redis.hdel(table, key)
  402. def _stop_all_thread(self):
  403. self._request_buffer.stop()
  404. self._item_buffer.stop()
  405. # 停止 collector
  406. self._collector.stop()
  407. # 停止 parser_controls
  408. for parser_control in self._parser_controls:
  409. parser_control.stop()
  410. self._started.clear()
  411. def send_msg(self, msg, level="debug", message_prefix=""):
  412. #TODO 这个方法是消息预警,但如果每次都发送,会造成消息轰炸,所以采集框架的消息预警没有开启,
  413. # 后续优化方向,消息预警的内容可以通过接口,接受保存,并对内容紧急度进行分辨,紧急度高的消息,可以直接发送至微信群中,这里尽量不要直接存储,feapder
  414. # 框架不进行mongo的直接存储,只做查询操作
  415. # log.debug("发送报警 level:{} msg{}".format(level, msg))
  416. tools.send_msg(msg=msg, level=level, message_prefix=message_prefix)
  417. def get_argvs(self):
  418. argvs = {"next_page": False, "max_page": 10}
  419. for item in sys.argv[1:]:
  420. # print(item)
  421. if item.startswith("--"):
  422. key = item.replace("--", "").split('=')[0]
  423. val = item.split('=')[-1]
  424. if key != 'purpose':
  425. argvs[key] = eval(val) # 此处使用eval的原因是字符串转bool或int
  426. return json.loads(json.dumps(argvs), object_hook=Obj)
  427. def spider_begin(self):
  428. """
  429. @summary: start_monitor_task 方式启动,此函数与spider_end不在同一进程内,变量不可共享
  430. ---------
  431. ---------
  432. @result:
  433. """
  434. if self._begin_callback:
  435. self._begin_callback()
  436. for parser in self._parsers:
  437. parameter = self.get_argvs()
  438. parser.platform_next_page = parameter.next_page
  439. parser.platform_max_page = parameter.max_page
  440. parser.start_callback()
  441. # 记录开始时间
  442. if not self._redisdb.hexists(self._tab_spider_time, SPIDER_START_TIME_KEY):
  443. current_timestamp = tools.get_current_timestamp()
  444. self._redisdb.hset(
  445. self._tab_spider_time, SPIDER_START_TIME_KEY, current_timestamp
  446. )
  447. # 发送消息
  448. # self.send_msg("《%s》爬虫开始" % self._spider_name)
  449. def spider_end(self): # step end 爬虫结束时的一些操作
  450. self.record_end_time()
  451. if self._end_callback: # 系统自带的回调,如果自定义回调,则这个回调不会执行
  452. self._end_callback()
  453. for parser in self._parsers:
  454. if not self._keep_alive:
  455. parser.close() # 爬虫可自定义close
  456. parser.end_callback() # 调用结束回调函数,可在爬虫自定义
  457. if not self._keep_alive:
  458. # 关闭webdirver
  459. if Request.webdriver_pool:
  460. Request.webdriver_pool.close()
  461. # 关闭打点
  462. metrics.close()
  463. else:
  464. metrics.flush()
  465. # 计算抓取时长
  466. data = self._redisdb.hget(
  467. self._tab_spider_time, SPIDER_START_TIME_KEY, is_pop=True
  468. )
  469. if data:
  470. begin_timestamp = int(data)
  471. elapsed_time = tools.get_current_timestamp() - begin_timestamp
  472. msg = "《%s》爬虫结束,耗时 %s" % (
  473. self._spider_name,
  474. tools.format_seconds(elapsed_time),
  475. )
  476. log.info(msg)
  477. # self.send_msg(msg)
  478. if self._keep_alive:
  479. log.info("爬虫不自动结束,等待下一轮任务...")
  480. else:
  481. if self._collector.get_spider_count() <= 1:
  482. self.delete_tables(self._tab_spider_time)
  483. self.delete_tables(self._tab_spider_status)
  484. else:
  485. # 清除关闭爬虫的心跳记录,防止删除任务共享表,造成爬虫异常僵死
  486. self._collector.delete_spider_node()
  487. def record_end_time(self):
  488. # 记录结束时间
  489. if self._batch_interval:
  490. current_timestamp = tools.get_current_timestamp()
  491. self._redisdb.hset(
  492. self._tab_spider_time, SPIDER_END_TIME_KEY, current_timestamp
  493. )
  494. def is_reach_next_spider_time(self): # 如果没有设置爬虫的启动时间,这一块儿不需要管的
  495. if not self._batch_interval:
  496. return True
  497. # 下面是对上次执行完成的时间和当前时间的一个校验,不在规定范围内则不启动爬虫,阻塞等待时间到达后再运行爬虫
  498. last_spider_end_time = self._redisdb.hget(
  499. self._tab_spider_time, SPIDER_END_TIME_KEY
  500. )
  501. if last_spider_end_time:
  502. last_spider_end_time = int(last_spider_end_time)
  503. current_timestamp = tools.get_current_timestamp()
  504. time_interval = current_timestamp - last_spider_end_time
  505. if time_interval < self._batch_interval * 86400:
  506. log.info(
  507. "上次运行结束时间为 {} 与当前时间间隔 为 {}, 小于规定的抓取时间间隔 {}。爬虫不执行,退出~".format(
  508. tools.timestamp_to_date(last_spider_end_time),
  509. tools.format_seconds(time_interval),
  510. tools.format_seconds(self._batch_interval * 86400),
  511. )
  512. )
  513. return False
  514. return True
  515. def join(self, timeout=None):
  516. """
  517. 重写线程的join
  518. """
  519. if not self._started.is_set():
  520. return
  521. super().join()