crawl_scheduler.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import datetime
  2. import random
  3. import time
  4. from datetime import date, timedelta
  5. import requests
  6. from crawler.account import account_record, get_account, release_account
  7. from crawler.login import User
  8. from utils.databases import mongo_table, int2long, object_id
  9. from utils.execptions import ZbYTbCrawlError
  10. from utils.log import logger
  11. from utils.tools import get_host_ip, wait
  12. class Scheduler:
  13. def __init__(self, site, crawl_type, **kwargs):
  14. self.site = site
  15. self.crawl_type = crawl_type
  16. self.crawl_start = False
  17. self.count = None # 日采集数量
  18. self.total = None # 日采集上限
  19. self.account_id = None
  20. self.record_id = None
  21. self.user = None
  22. self.spider_code = None
  23. self.crawl_url = None
  24. self.crawl_params = None
  25. self.crawl_exception = None
  26. self.kwargs = kwargs
  27. self.account_tab = mongo_table('py_spider', 'match_account')
  28. self.record_tab = mongo_table('py_spider', 'match_account_record')
  29. self.crawl_error_tab = mongo_table('py_spider', 'crawl_error')
  30. list_attr = dict(
  31. name='crawl_list',
  32. lock=dict(crawl_list=True),
  33. release=dict(crawl_list=False),
  34. )
  35. detail_attr = dict(
  36. name='crawl_detail',
  37. lock=dict(crawl_detail=True),
  38. release=dict(crawl_detail=False),
  39. )
  40. self._schedule = {'list': list_attr, 'detail': detail_attr}
  41. self.account = get_account(self.site, self.crawl_type)
  42. def _init(self):
  43. self.account_id = self.account['_id']
  44. account_record(self.account_id, self.crawl_type) # 保存使用账号,用于容器自启动归还账号
  45. self.user = User(self.account['account'], self.account['password'])
  46. logger.info(f'[启用账号]{self.user.username}')
  47. history = self.account_history_crawl_record()
  48. self.count = history['count'] # 访问条数
  49. def crawl_counter(self, number: int):
  50. """采集计数器"""
  51. records = self.record_tab.find_one({'_id': self.record_id})
  52. records['count'] += number
  53. self.count = records['count']
  54. self._update_tab(self.record_tab, self.record_id, count=self.count)
  55. def query_user(self, account: str):
  56. query = {'account': account}
  57. item = self.account_tab.find_one(query)
  58. if item is None:
  59. return None
  60. return User(item['account'], item['password'])
  61. def err_record(self, err: ZbYTbCrawlError):
  62. rows = {
  63. 'account': self.user.username if self.user is not None else '',
  64. 'spidercode': self.spider_code,
  65. 'url': self.crawl_url,
  66. 'status_code': err.code,
  67. 'reason': err.reason,
  68. 'params': getattr(err, 'title', ''),
  69. 'crawl_time': int2long(int(time.time())),
  70. 'crawl_type': self.crawl_type,
  71. }
  72. self.crawl_error_tab.insert_one(rows)
  73. def _update_tab(self, collection, mid, **update):
  74. update['update_time'] = self.current_time
  75. collection.update_one({'_id': mid}, {'$set': update})
  76. def account_history_crawl_record(self):
  77. """使用账号采集记录"""
  78. query = dict(
  79. account=self.account['account'],
  80. date=self.today,
  81. type=self.crawl_type,
  82. site=self.site
  83. )
  84. item = self.record_tab.find_one(query)
  85. if item is None:
  86. item = dict(
  87. site=self.site,
  88. account=self.account['account'],
  89. type=self.crawl_type,
  90. total=self.account.get('total', 0), # 任务总数默认值:0
  91. count=0,
  92. ip=get_host_ip(),
  93. date=self.today,
  94. update_time=self.current_time,
  95. )
  96. result = self.record_tab.insert_one(item)
  97. item['_id'] = result.inserted_id
  98. self.record_id = item['_id']
  99. return item
  100. @staticmethod
  101. def wait_for_next_task(interval=None):
  102. interval = (interval or random.choice(range(5, 15)))
  103. wait(interval)
  104. def finished(self, interval=None):
  105. logger.info("任务结束")
  106. # release_account(self.account_id, self.crawl_type)
  107. wait(interval)
  108. @property
  109. def crawl_task(self):
  110. results = {}
  111. url = 'http://cc.spdata.jianyu360.com/crawl/zbytb/task/fetch'
  112. try:
  113. response = requests.get(url, timeout=10)
  114. if response.status_code == 200:
  115. data = response.json()['data']
  116. if len(data) > 0:
  117. results['_id'] = object_id(data['_id'])
  118. for key, val in data.items():
  119. if key != '_id':
  120. results[key] = val
  121. return results
  122. except requests.RequestException:
  123. return results
  124. @property
  125. def today(self):
  126. return datetime.datetime.today().strftime('%Y-%m-%d')
  127. @property
  128. def current_time(self):
  129. return datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
  130. @property
  131. def yesterday(self):
  132. return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
  133. def __enter__(self):
  134. logger.info('[开启调度]')
  135. '''获取闲置账号'''
  136. if self.account is not None:
  137. self._init()
  138. self.crawl_start = True
  139. else:
  140. logger.warning(f'[{self.site}]暂无闲置账号')
  141. return self
  142. def __exit__(self, exc_type, exc_val, exc_tb):
  143. logger.info('[关闭调度]')
  144. release_account(self.account_id, self.crawl_type)
  145. self.crawl_start = False
  146. if exc_type is not None:
  147. logger.exception(exc_tb)
  148. e = ZbYTbCrawlError(
  149. code=10500,
  150. reason=str(exc_type),
  151. title='未知系统错误'
  152. )
  153. self.err_record(e)
  154. return True