crawl_scheduler.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import datetime
  2. import random
  3. import time
  4. import traceback
  5. from datetime import date, timedelta
  6. import requests
  7. from crawler.login import User
  8. from utils.databases import mongo_table, int2long, object_id
  9. from utils.execptions import JyBasicException
  10. from utils.log import logger
  11. from utils.tools import get_host_ip
  12. class Scheduler:
  13. def __init__(self, site, crawl_type, **kwargs):
  14. self.site = site
  15. self.crawl_type = crawl_type
  16. self.crawl_start = False # 控制调度的状态
  17. self.count = None # 日采集数量
  18. self.total = None # 日采集上限
  19. self.account_id = None # 账号id
  20. self.record_id = None # 日采集记录id
  21. self.user = None # 账户
  22. self.spider_code = None
  23. self.crawl_url = None
  24. self.crawl_params = None
  25. self.crawl_exception = None
  26. self.kwargs = kwargs
  27. self.account_tab = mongo_table('py_spider', 'match_account')
  28. self.record_tab = mongo_table('py_spider', 'match_account_record')
  29. self.error_tab = mongo_table('py_spider', 'crawl_error')
  30. def crawl_counter(self, number: int):
  31. """采集计数器"""
  32. records = self.record_tab.find_one({'_id': self.record_id})
  33. records['count'] += number
  34. self.count = records['count']
  35. self._update_tab(self.record_tab, self.record_id, records)
  36. def finished(self, execute_next_time=None):
  37. logger.info("任务结束")
  38. self._release_account()
  39. self.sleep(execute_next_time)
  40. def err_record(self, e: JyBasicException):
  41. rows = {
  42. 'account': self.user.phone if self.user is not None else '',
  43. 'spidercode': self.spider_code,
  44. 'url': self.crawl_url,
  45. 'status_code': e.code,
  46. 'reason': e.reason,
  47. 'params': getattr(e, 'title', ''),
  48. 'crawl_time': int2long(int(time.time())),
  49. 'crawl_type': self.crawl_type,
  50. }
  51. self.error_tab.insert_one(rows)
  52. def _release_account(self):
  53. if self.crawl_type == 'detail':
  54. rows = dict(crawl_detail=False,)
  55. else:
  56. rows = dict(crawl_list=False,)
  57. if self.account_id is not None:
  58. self._update_tab(self.account_tab, self.account_id, rows)
  59. def _update_tab(self, mgo_coll, _id, item):
  60. """
  61. 更新mongo表
  62. :param mgo_coll: mongo表
  63. :param _id: mongo_id
  64. :param item: 数据
  65. """
  66. item['update_time'] = self.current_time
  67. mgo_coll.update_one({'_id': _id}, {'$set': item})
  68. def __enter__(self):
  69. logger.info(f'[开启调度]')
  70. '''获取闲置账号'''
  71. if self.account is not None:
  72. self.account_id = self.account['_id']
  73. self.user = User(self.account['account'], self.account['password'])
  74. logger.info(f'[启用账号]{self.user.phone}')
  75. '''初始化记录表'''
  76. records = self.account_records
  77. if self.crawl_type == 'detail':
  78. item = {'crawl_detail': True}
  79. self.total = records['total']
  80. self.count = records['count']
  81. else:
  82. item = {'crawl_list': True}
  83. '''初始化采集账号记录'''
  84. self._update_tab(self.account_tab, self.account_id, item)
  85. self.crawl_start = True
  86. else:
  87. logger.warning(f'[{self.site}]暂无闲置账号')
  88. return self
  89. @staticmethod
  90. def wait_for_next_task(wait_time=None):
  91. _sleep = (wait_time or random.choice(range(5, 11)))
  92. time.sleep(_sleep)
  93. @staticmethod
  94. def sleep(wait_time=None):
  95. sleep_time = (wait_time or 600)
  96. time.sleep(sleep_time)
  97. @property
  98. def account_records(self):
  99. """账号使用记录"""
  100. query = dict(
  101. account=self.account['account'],
  102. date=self.today,
  103. type=self.crawl_type,
  104. site=self.site
  105. )
  106. item = self.record_tab.find_one(query)
  107. if item is None:
  108. item = dict(
  109. site=self.site,
  110. account=self.account['account'],
  111. type=self.crawl_type,
  112. total=self.account['total'],
  113. count=0,
  114. ip=get_host_ip(),
  115. date=self.today,
  116. update_time=self.current_time,
  117. )
  118. result = self.record_tab.insert_one(item)
  119. item['_id'] = result.inserted_id
  120. self.record_id = item['_id']
  121. return item
  122. @property
  123. def account(self):
  124. """账号"""
  125. query = dict(site=self.site)
  126. if self.crawl_type == 'detail':
  127. query['crawl_detail'] = False
  128. else:
  129. query['crawl_list'] = False
  130. return self.account_tab.find_one(query, sort=[('update_time', 1)])
  131. @property
  132. def crawl_task(self):
  133. results = {}
  134. url = 'http://cc.spdata.jianyu360.com/crawl/ybw/task/fetch'
  135. try:
  136. response = requests.get(url, timeout=10)
  137. if response.status_code == 200:
  138. data = response.json()['data']
  139. if len(data) > 0:
  140. results['_id'] = object_id(data['_id'])
  141. for key, val in data.items():
  142. if key != '_id':
  143. results[key] = val
  144. return results
  145. except requests.RequestException:
  146. return results
  147. @property
  148. def today(self):
  149. return datetime.datetime.today().strftime('%Y-%m-%d')
  150. @property
  151. def current_time(self):
  152. return datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
  153. @property
  154. def yesterday(self):
  155. return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
  156. def __exit__(self, exc_type, exc_val, exc_tb):
  157. logger.info(f'[关闭调度]')
  158. self._release_account()
  159. self.crawl_start = False
  160. if exc_type is not None:
  161. errmsg = traceback.extract_tb(exc_tb)
  162. e = JyBasicException(
  163. code=10500,
  164. reason=str(exc_type),
  165. title='未知系统错误'
  166. )
  167. self.err_record(e)
  168. logger.error(f'错误类型: {exc_type}, 错误内容: {exc_val}, 错误详情: {errmsg}')
  169. return True