crawl_scheduler.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import datetime
  2. import random
  3. import time
  4. from datetime import date, timedelta
  5. import requests
  6. from crawler.login import User
  7. from utils.databases import mongo_table, int2long, object_id
  8. from utils.execptions import ZbYTbCrawlError
  9. from utils.log import logger
  10. from utils.tools import get_host_ip
  11. class Scheduler:
  12. def __init__(self, site, crawl_type, **kwargs):
  13. self.site = site
  14. self.crawl_type = crawl_type
  15. self.crawl_start = False
  16. self.count = None # 日采集数量
  17. self.total = None # 日采集上限
  18. self.account_id = None
  19. self.record_id = None
  20. self.user = None
  21. self.spider_code = None
  22. self.crawl_url = None
  23. self.crawl_params = None
  24. self.crawl_exception = None
  25. self.kwargs = kwargs
  26. self._headers = {"Authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB"}
  27. self.account_tab = mongo_table('py_spider', 'match_account')
  28. self.record_tab = mongo_table('py_spider', 'match_account_record')
  29. self.crawl_error_tab = mongo_table('py_spider', 'crawl_error')
  30. list_attr = dict(
  31. name='crawl_list',
  32. lock=dict(crawl_list=True),
  33. release=dict(crawl_list=False),
  34. )
  35. detail_attr = dict(
  36. name='crawl_detail',
  37. lock=dict(crawl_detail=True),
  38. release=dict(crawl_detail=False),
  39. )
  40. self._schedule = {'list': list_attr, 'detail': detail_attr}
  41. self.account = self.get_account()
  42. def _init(self):
  43. self.account_id = self.account['_id']
  44. self.user = User(self.account['account'], self.account['password'])
  45. logger.info(f'[启用账号]{self.user.username}')
  46. history = self.account_history_crawl_record()
  47. self.count = history['count'] # 访问条数
  48. def get_account(self):
  49. url = "http://cc.spdata.jianyu360.com/competing_goods/account/fetch"
  50. params = {
  51. "site": self.site,
  52. "crawl_type": self.crawl_type
  53. }
  54. try:
  55. response = requests.get(url,
  56. headers=self._headers,
  57. params=params,
  58. timeout=10)
  59. print(response.json())
  60. data = response.json()['data']
  61. except requests.RequestException:
  62. # 网络不通信时,无法获取账号
  63. data = None
  64. return data
  65. def _release_account(self):
  66. url = "http://cc.spdata.jianyu360.com/competing_goods/account/release"
  67. if self.account_id is not None:
  68. params = {
  69. "uid": self.account_id,
  70. "crawl_type": self.crawl_type
  71. }
  72. while True:
  73. try:
  74. response = requests.get(url,
  75. headers=self._headers,
  76. params=params,
  77. timeout=10)
  78. if response.status_code == 200:
  79. logger.debug(f"_release_account >>> {response.json()}")
  80. break
  81. except requests.RequestException:
  82. logger.error("网络异常,归还账号失败")
  83. self._wait_schedule(1)
  84. def crawl_counter(self, number: int):
  85. """采集计数器"""
  86. records = self.record_tab.find_one({'_id': self.record_id})
  87. records['count'] += number
  88. self.count = records['count']
  89. self._update_tab(self.record_tab, self.record_id, count=self.count)
  90. def query_user(self, account: str):
  91. query = {'account': account}
  92. item = self.account_tab.find_one(query)
  93. if item is None:
  94. return None
  95. return User(item['account'], item['password'])
  96. def err_record(self, err: ZbYTbCrawlError):
  97. rows = {
  98. 'account': self.user.username if self.user is not None else '',
  99. 'spidercode': self.spider_code,
  100. 'url': self.crawl_url,
  101. 'status_code': err.code,
  102. 'reason': err.reason,
  103. 'params': getattr(err, 'title', ''),
  104. 'crawl_time': int2long(int(time.time())),
  105. 'crawl_type': self.crawl_type,
  106. }
  107. self.crawl_error_tab.insert_one(rows)
  108. def _update_tab(self, collection, mid, **update):
  109. update['update_time'] = self.current_time
  110. collection.update_one({'_id': mid}, {'$set': update})
  111. def account_history_crawl_record(self):
  112. """使用账号采集记录"""
  113. query = dict(
  114. account=self.account['account'],
  115. date=self.today,
  116. type=self.crawl_type,
  117. site=self.site
  118. )
  119. item = self.record_tab.find_one(query)
  120. if item is None:
  121. item = dict(
  122. site=self.site,
  123. account=self.account['account'],
  124. type=self.crawl_type,
  125. total=self.account.get('total', 0), # 任务总数默认值:0
  126. count=0,
  127. ip=get_host_ip(),
  128. date=self.today,
  129. update_time=self.current_time,
  130. )
  131. result = self.record_tab.insert_one(item)
  132. item['_id'] = result.inserted_id
  133. self.record_id = item['_id']
  134. return item
  135. def wait_for_next_task(self, interval=None):
  136. interval = (interval or random.choice(range(5, 15)))
  137. self._wait_schedule(interval)
  138. def finished(self, execute_next_time=None):
  139. logger.info("任务结束")
  140. self._release_account()
  141. self._wait_schedule(execute_next_time)
  142. @staticmethod
  143. def _wait_schedule(interval=None):
  144. _interval = (interval or 600)
  145. time.sleep(_interval)
  146. @property
  147. def crawl_task(self):
  148. results = {}
  149. url = 'http://cc.spdata.jianyu360.com/crawl/zbytb/task/fetch'
  150. try:
  151. response = requests.get(url, timeout=10)
  152. if response.status_code == 200:
  153. data = response.json()['data']
  154. if len(data) > 0:
  155. results['_id'] = object_id(data['_id'])
  156. for key, val in data.items():
  157. if key != '_id':
  158. results[key] = val
  159. return results
  160. except requests.RequestException:
  161. return results
  162. @property
  163. def today(self):
  164. return datetime.datetime.today().strftime('%Y-%m-%d')
  165. @property
  166. def current_time(self):
  167. return datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
  168. @property
  169. def yesterday(self):
  170. return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
  171. def __enter__(self):
  172. logger.info('[开启调度]')
  173. '''获取闲置账号'''
  174. if self.account is not None:
  175. self._init()
  176. self.crawl_start = True
  177. else:
  178. logger.warning(f'[{self.site}]暂无闲置账号')
  179. return self
  180. def __exit__(self, exc_type, exc_val, exc_tb):
  181. logger.info('[关闭调度]')
  182. self._release_account()
  183. self.crawl_start = False
  184. if exc_type is not None:
  185. logger.exception(exc_tb)
  186. e = ZbYTbCrawlError(
  187. code=10500,
  188. reason=str(exc_type),
  189. title='未知系统错误'
  190. )
  191. self.err_record(e)
  192. return True