crawl_scheduler.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import datetime
  2. import random
  3. import time
  4. import traceback
  5. from datetime import date, timedelta
  6. import requests
  7. from crawler.login import User
  8. from utils.databases import mongo_table, int2long, object_id
  9. from utils.execptions import JyBasicException
  10. from utils.log import logger
  11. from utils.tools import get_host_ip
  12. class Scheduler:
  13. def __init__(self, query: dict):
  14. self.query = query
  15. self.account_tab = mongo_table('py_spider', 'match_account')
  16. self.error_tab = mongo_table('py_spider', 'crawl_error')
  17. self.crawl_start = False # 控制调度的状态
  18. self.count = None # 日采集数量
  19. self.total = None # 日采集上限
  20. self.account_id = None
  21. self.user = None
  22. self.spider_code = None
  23. self.crawl_url = None
  24. self.crawl_params = None
  25. self.crawl_exception = None
  26. self.crawl_type = None
  27. @property
  28. def today(self):
  29. return datetime.datetime.today().strftime('%Y-%m-%d')
  30. @property
  31. def yesterday(self):
  32. return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
  33. @property
  34. def crawl_task(self):
  35. results = {}
  36. url = 'http://cc.spdata.jianyu360.com/schedule/crawl_task/ybw_scheduler'
  37. # url = 'http://127.0.0.1:1405/schedule/crawl_task/ybw_scheduler'
  38. try:
  39. response = requests.get(url, timeout=10)
  40. if response.status_code == 200:
  41. data = response.json()['data']
  42. if len(data) > 0:
  43. results['_id'] = object_id(data['_id'])
  44. for key, val in data.items():
  45. if key != '_id':
  46. results[key] = val
  47. return results
  48. except requests.RequestException:
  49. return results
  50. def finished(self, execute_next_time=None):
  51. logger.info("任务结束")
  52. self._release_account()
  53. self.sleep(execute_next_time)
  54. def err_record(self, e: JyBasicException):
  55. rows = {
  56. 'account': self.user.phone if self.user is not None else '',
  57. 'spidercode': self.spider_code,
  58. 'url': self.crawl_url,
  59. 'status_code': e.code,
  60. 'reason': e.reason,
  61. 'params': getattr(e, 'title', ''),
  62. 'crawl_time': int2long(int(time.time())),
  63. 'crawl_type': self.crawl_type,
  64. }
  65. self.error_tab.insert_one(rows)
  66. @staticmethod
  67. def wait_for_next_task(wait_time=None):
  68. _sleep = (wait_time or random.choice(range(5, 11)))
  69. time.sleep(_sleep)
  70. @staticmethod
  71. def sleep(wait_time=None):
  72. sleep_time = (wait_time or 600)
  73. time.sleep(sleep_time)
  74. def _release_account(self):
  75. rows = dict(
  76. used=False,
  77. update_time=datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
  78. )
  79. if self.account_id is not None:
  80. self._update_data(rows)
  81. def _update_data(self, item):
  82. """
  83. 更新账号所属的采集数据信息
  84. :param item: 最新数据
  85. """
  86. item['ip'] = get_host_ip()
  87. item['update_time'] = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
  88. self.account_tab.update_one(
  89. {'_id': self.account_id},
  90. {'$set': item}
  91. )
  92. def update_count(self, number):
  93. rows = self.account_tab.find_one({'_id': self.account_id})
  94. records = rows.get('records', {self.today: 0})
  95. '''采集记录历史保存7天'''
  96. count = records.get(self.today, 0)
  97. count += number
  98. self.count = count
  99. if len(records) > 7:
  100. records.clear()
  101. records.setdefault(self.today, count)
  102. else:
  103. records.update({self.today: count})
  104. rows.update({'records': records})
  105. self._update_data(rows)
  106. def __enter__(self):
  107. logger.info(f'[开启调度]')
  108. '''获取一个闲置时间较久的账号'''
  109. rows = self.account_tab.find_one(self.query, sort=[('update_time', 1)])
  110. if rows is not None:
  111. self.account_id = rows['_id']
  112. self.user = User(rows['account'], rows['password'])
  113. logger.info(f'[启用账号]{self.user.phone}')
  114. rows['used'] = True
  115. '''初始化采集数据记录'''
  116. records = rows.get('records', {self.today: 0})
  117. rows.update({'records': records})
  118. self._update_data(rows)
  119. self.total = rows['total']
  120. self.count = records.get(self.today, 0)
  121. self.crawl_start = True
  122. else:
  123. logger.warning(f'[{self.query.get("site")}采集]暂无闲置账号')
  124. return self
  125. def __exit__(self, exc_type, exc_val, exc_tb):
  126. logger.info(f'[关闭调度]')
  127. self._release_account()
  128. self.crawl_start = False
  129. if exc_type is not None:
  130. errmsg = traceback.extract_tb(exc_tb)
  131. e = JyBasicException(
  132. code=10500,
  133. reason=str(exc_type),
  134. title='未知系统错误'
  135. )
  136. self.err_record(e)
  137. logger.error(f'错误类型: {exc_type}, 错误内容: {exc_val}, 错误详情: {errmsg}')
  138. return True