crawl_scheduler.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import datetime
  2. import random
  3. import time
  4. import traceback
  5. from datetime import date, timedelta
  6. import requests
  7. from crawler.login import User
  8. from utils.databases import MongoDBS
  9. from utils.execptions import JyBasicException
  10. from utils.log import logger
  11. from utils.tools import int2long, object_id
  12. class Scheduler:
  13. def __init__(self, query: dict):
  14. self.query = query
  15. self.crawl_account_tab = MongoDBS('py_spider', 'match_account').coll
  16. self.crawl_error_tab = MongoDBS('py_spider', 'crawl_error').coll
  17. self.crawl_start = False
  18. self.account_id = None
  19. self.user = None
  20. self.spider_code = None
  21. self.crawl_url = None
  22. self.crawl_params = None
  23. self.crawl_exception = None
  24. self.crawl_type = None
  25. self.__records = None
  26. def _release_account(self):
  27. rows = dict(
  28. used=False,
  29. update_time=datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
  30. )
  31. if self.account_id is not None:
  32. self.crawl_account_tab.update_one(
  33. {'_id': self.account_id},
  34. {'$set': rows}
  35. )
  36. def __enter__(self):
  37. # 取出一个空闲并且使用次数较少的账号
  38. rows = self.crawl_account_tab.find_one(self.query, sort=[('usage', 1)])
  39. if rows is not None:
  40. self.account_id = rows['_id']
  41. self.user = User(rows['account'], rows['password'])
  42. logger.info(f'[开启调度]启用账号: {self.user.phone}')
  43. usage = int(rows['usage'])
  44. rows['usage'] = usage + 1
  45. use_time = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
  46. rows['update_time'] = use_time
  47. rows['used'] = True
  48. self.crawl_account_tab.update_one(
  49. {'_id': self.account_id},
  50. {'$set': rows}
  51. )
  52. self.crawl_start = True # 控制调度的状态
  53. else:
  54. # TODO 没有空闲账号时,取出使用次数最少的账号,暂未实现
  55. logger.warning(f'请检查mongo表 {self.crawl_account_tab.name} 账号状态')
  56. return self
  57. def __exit__(self, exc_type, exc_val, exc_tb):
  58. logger.info(f'[关闭调度]')
  59. self._release_account()
  60. self.crawl_start = False
  61. if exc_type is not None:
  62. errmsg = traceback.extract_tb(exc_tb)
  63. e = JyBasicException(
  64. code=10500,
  65. reason=str(exc_type),
  66. title='未知系统错误'
  67. )
  68. self.err_record(e)
  69. logger.error(f'错误类型: {exc_type}, 错误内容: {exc_val}, 错误详情: {errmsg}')
  70. return True
  71. def finished(self, execute_next_time=None):
  72. logger.info("任务完成")
  73. self._release_account()
  74. self.sleep(execute_next_time)
  75. @staticmethod
  76. def wait_for_next_task(wait_time=None):
  77. _sleep = (wait_time or random.choice(range(5, 15)))
  78. time.sleep(_sleep)
  79. @staticmethod
  80. def sleep(wait_time=None):
  81. sleep_time = (wait_time or 600)
  82. time.sleep(sleep_time)
  83. @property
  84. def today(self):
  85. return datetime.datetime.today().strftime('%Y-%m-%d')
  86. @property
  87. def yesterday(self):
  88. return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
  89. def err_record(self, e: JyBasicException):
  90. rows = {
  91. 'account': self.user.phone if self.user is not None else '',
  92. 'spidercode': self.spider_code,
  93. 'url': self.crawl_url,
  94. 'status_code': e.code,
  95. 'reason': e.reason,
  96. 'params': getattr(e, 'title', ''),
  97. 'crawl_time': int2long(int(time.time())),
  98. 'crawl_type': self.crawl_type,
  99. }
  100. self.crawl_error_tab.insert_one(rows)
  101. @property
  102. def crawl_task(self):
  103. results = {}
  104. url = 'http://cc.spdata.jianyu360.com/schedule/crawl_task/ybw_scheduler'
  105. # url = 'http://127.0.0.1:1405/schedule/crawl_task/ybw_scheduler'
  106. try:
  107. response = requests.get(url, timeout=10)
  108. if response.status_code == 200:
  109. data = response.json()['data']
  110. if len(data) > 0:
  111. results['_id'] = object_id(data['_id'])
  112. for key, val in data.items():
  113. if key != '_id':
  114. results[key] = val
  115. return results
  116. except requests.RequestException:
  117. return results