crawl_scheduler.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. import datetime
  2. import random
  3. import time
  4. import traceback
  5. from datetime import date, timedelta
  6. import requests
  7. from crawler.login import User
  8. from utils.databases import mongo_table, int2long, object_id
  9. from utils.execptions import JyBasicException
  10. from utils.log import logger
  11. from utils.tools import get_host_ip
  12. class Scheduler:
  13. def __init__(self, site, crawl_type, **kwargs):
  14. self.site = site
  15. self.crawl_type = crawl_type
  16. self.crawl_start = False
  17. self.count = None # 日采集数量
  18. self.total = None # 日采集上限
  19. self.account_id = None
  20. self.record_id = None
  21. self.user = None
  22. self.spider_code = None
  23. self.crawl_url = None
  24. self.crawl_params = None
  25. self.crawl_exception = None
  26. self.kwargs = kwargs
  27. self.account_tab = mongo_table('py_spider', 'match_account')
  28. self.record_tab = mongo_table('py_spider', 'match_account_record')
  29. self.crawl_error_tab = mongo_table('py_spider', 'crawl_error')
  30. def crawl_counter(self, number: int):
  31. """采集计数器"""
  32. records = self.record_tab.find_one({'_id': self.record_id})
  33. records['count'] += number
  34. self.count = records['count']
  35. self._update_tab(self.record_tab, self.record_id, records)
  36. def query_user(self, account: str):
  37. query = {'account': account}
  38. item = self.account_tab.find_one(query)
  39. if item is None:
  40. return None
  41. return User(item['account'], item['password'])
  42. def finished(self, execute_next_time=None):
  43. logger.info("任务结束")
  44. self._release_account()
  45. self.sleep(execute_next_time)
  46. def err_record(self, e: JyBasicException):
  47. rows = {
  48. 'account': self.user.username if self.user is not None else '',
  49. 'spidercode': self.spider_code,
  50. 'url': self.crawl_url,
  51. 'status_code': e.code,
  52. 'reason': e.reason,
  53. 'params': getattr(e, 'title', ''),
  54. 'crawl_time': int2long(int(time.time())),
  55. 'crawl_type': self.crawl_type,
  56. }
  57. self.crawl_error_tab.insert_one(rows)
  58. def _update_tab(self, mgo_coll, _id, item):
  59. """
  60. 更新mongo表
  61. :param mgo_coll: mongo表
  62. :param _id: mongo_id
  63. :param item: 数据
  64. """
  65. item['update_time'] = self.current_time
  66. mgo_coll.update_one({'_id': _id}, {'$set': item})
  67. def _release_account(self):
  68. if self.crawl_type == 'detail':
  69. rows = dict(crawl_detail=False,)
  70. else:
  71. rows = dict(crawl_list=False,)
  72. if self.account_id is not None:
  73. self._update_tab(self.account_tab, self.account_id, rows)
  74. def __enter__(self):
  75. logger.info(f'[开启调度]')
  76. '''获取闲置账号'''
  77. if self.account is not None:
  78. self.account_id = self.account['_id']
  79. self.user = User(self.account['account'], self.account['password'])
  80. logger.info(f'[启用账号]{self.user.username}')
  81. '''初始化记录表'''
  82. records = self.account_records
  83. if self.crawl_type == 'detail':
  84. item = {'crawl_detail': True}
  85. self.total = records['total']
  86. self.count = records['count']
  87. else:
  88. item = {'crawl_list': True}
  89. '''初始化采集账号记录'''
  90. self._update_tab(self.account_tab, self.account_id, item)
  91. self.crawl_start = True
  92. else:
  93. logger.warning(f'[{self.site}]暂无闲置账号')
  94. return self
  95. @staticmethod
  96. def wait_for_next_task(wait_time=None):
  97. _sleep = (wait_time or random.choice(range(5, 15)))
  98. time.sleep(_sleep)
  99. @staticmethod
  100. def sleep(wait_time=None):
  101. sleep_time = (wait_time or 600)
  102. time.sleep(sleep_time)
  103. @property
  104. def account_records(self):
  105. """账号使用记录"""
  106. query = dict(
  107. account=self.account['account'],
  108. date=self.today,
  109. type=self.crawl_type,
  110. site=self.site
  111. )
  112. item = self.record_tab.find_one(query)
  113. if item is None:
  114. item = dict(
  115. site=self.site,
  116. account=self.account['account'],
  117. type=self.crawl_type,
  118. total=self.account.get('total', 0), # 任务总数不设置时,默认为:0
  119. count=0,
  120. ip=get_host_ip(),
  121. date=self.today,
  122. update_time=self.current_time,
  123. )
  124. result = self.record_tab.insert_one(item)
  125. item['_id'] = result.inserted_id
  126. self.record_id = item['_id']
  127. return item
  128. @property
  129. def account(self):
  130. """账号"""
  131. query = dict(site=self.site)
  132. if self.crawl_type == 'detail':
  133. query['crawl_detail'] = False
  134. else:
  135. query['crawl_list'] = False
  136. return self.account_tab.find_one(query, sort=[('update_time', 1)])
  137. @property
  138. def crawl_task(self):
  139. results = {}
  140. url = 'http://cc.spdata.jianyu360.com/schedule/crawl_task/zgytb_scheduler'
  141. # url = 'http://127.0.0.1:1405/schedule/crawl_task/zgytb_scheduler'
  142. try:
  143. response = requests.get(url, timeout=10)
  144. if response.status_code == 200:
  145. data = response.json()['data']
  146. if len(data) > 0:
  147. results['_id'] = object_id(data['_id'])
  148. for key, val in data.items():
  149. if key != '_id':
  150. results[key] = val
  151. return results
  152. except requests.RequestException:
  153. return results
  154. @property
  155. def today(self):
  156. return datetime.datetime.today().strftime('%Y-%m-%d')
  157. @property
  158. def current_time(self):
  159. return datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
  160. @property
  161. def yesterday(self):
  162. return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
  163. def __exit__(self, exc_type, exc_val, exc_tb):
  164. logger.info(f'[关闭调度]')
  165. self._release_account()
  166. self.crawl_start = False
  167. if exc_type is not None:
  168. errmsg = traceback.extract_tb(exc_tb)
  169. e = JyBasicException(
  170. code=10500,
  171. reason=str(exc_type),
  172. title='未知系统错误'
  173. )
  174. self.err_record(e)
  175. logger.error(f'错误类型: {exc_type}, 错误内容: {exc_val}, 错误详情: {errmsg}')
  176. return True