import datetime import random import time import traceback from datetime import date, timedelta import requests from crawler.login import User from utils.databases import MongoDBS from utils.execptions import JyBasicException from utils.log import logger from utils.tools import int2long, object_id class Scheduler: def __init__(self, query: dict): self.query = query self.crawl_account_tab = MongoDBS('py_spider', 'match_account').coll self.crawl_error_tab = MongoDBS('py_spider', 'crawl_error').coll self.crawl_start = False self.account_id = None self.user = None self.spider_code = None self.crawl_url = None self.crawl_params = None self.crawl_exception = None self.crawl_type = None self.__records = None def _release_account(self): rows = dict( used=False, update_time=datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S') ) if self.account_id is not None: self.crawl_account_tab.update_one( {'_id': self.account_id}, {'$set': rows} ) def __enter__(self): # 取出一个空闲并且使用次数较少的账号 rows = self.crawl_account_tab.find_one(self.query, sort=[('usage', 1)]) if rows is not None: self.account_id = rows['_id'] self.user = User(rows['account'], rows['password']) logger.info(f'[开启调度]启用账号: {self.user.phone}') usage = int(rows['usage']) rows['usage'] = usage + 1 use_time = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S') rows['update_time'] = use_time rows['used'] = True self.crawl_account_tab.update_one( {'_id': self.account_id}, {'$set': rows} ) self.crawl_start = True # 控制调度的状态 else: # TODO 没有空闲账号时,取出使用次数最少的账号,暂未实现 logger.warning(f'请检查mongo表 {self.crawl_account_tab.name} 账号状态') return self def __exit__(self, exc_type, exc_val, exc_tb): logger.info(f'[关闭调度]') self._release_account() self.crawl_start = False if exc_type is not None: errmsg = traceback.extract_tb(exc_tb) e = JyBasicException( code=10500, reason=str(exc_type), title='未知系统错误' ) self.err_record(e) logger.error(f'错误类型: {exc_type}, 错误内容: {exc_val}, 错误详情: {errmsg}') return True def finished(self, execute_next_time=None): logger.info("任务完成") self._release_account() self.sleep(execute_next_time) @staticmethod def wait_for_next_task(wait_time=None): _sleep = (wait_time or random.choice(range(5, 15))) time.sleep(_sleep) @staticmethod def sleep(wait_time=None): sleep_time = (wait_time or 600) time.sleep(sleep_time) @property def today(self): return datetime.datetime.today().strftime('%Y-%m-%d') @property def yesterday(self): return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d") def err_record(self, e: JyBasicException): rows = { 'account': self.user.phone if self.user is not None else '', 'spidercode': self.spider_code, 'url': self.crawl_url, 'status_code': e.code, 'reason': e.reason, 'params': getattr(e, 'title', ''), 'crawl_time': int2long(int(time.time())), 'crawl_type': self.crawl_type, } self.crawl_error_tab.insert_one(rows) @property def crawl_task(self): results = {} url = 'http://cc.spdata.jianyu360.com/schedule/crawl_task/ybw_scheduler' # url = 'http://127.0.0.1:1405/schedule/crawl_task/ybw_scheduler' try: response = requests.get(url, timeout=10) if response.status_code == 200: data = response.json()['data'] if len(data) > 0: results['_id'] = object_id(data['_id']) for key, val in data.items(): if key != '_id': results[key] = val return results except requests.RequestException: return results