import datetime import random import time import traceback from datetime import date, timedelta import requests from crawler.login import User from utils.databases import mongo_table, int2long, object_id from utils.execptions import JyBasicException from utils.log import logger from utils.tools import get_host_ip class Scheduler: def __init__(self, query: dict): self.query = query self.account_tab = mongo_table('py_spider', 'match_account') self.error_tab = mongo_table('py_spider', 'crawl_error') self.crawl_start = False # 控制调度的状态 self.count = None # 日采集数量 self.total = None # 日采集上限 self.account_id = None self.user = None self.spider_code = None self.crawl_url = None self.crawl_params = None self.crawl_exception = None self.crawl_type = None @property def today(self): return datetime.datetime.today().strftime('%Y-%m-%d') @property def yesterday(self): return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d") @property def crawl_task(self): results = {} url = 'http://cc.spdata.jianyu360.com/schedule/crawl_task/ybw_scheduler' # url = 'http://127.0.0.1:1405/schedule/crawl_task/ybw_scheduler' try: response = requests.get(url, timeout=10) if response.status_code == 200: data = response.json()['data'] if len(data) > 0: results['_id'] = object_id(data['_id']) for key, val in data.items(): if key != '_id': results[key] = val return results except requests.RequestException: return results def finished(self, execute_next_time=None): logger.info("任务结束") self._release_account() self.sleep(execute_next_time) def err_record(self, e: JyBasicException): rows = { 'account': self.user.phone if self.user is not None else '', 'spidercode': self.spider_code, 'url': self.crawl_url, 'status_code': e.code, 'reason': e.reason, 'params': getattr(e, 'title', ''), 'crawl_time': int2long(int(time.time())), 'crawl_type': self.crawl_type, } self.error_tab.insert_one(rows) @staticmethod def wait_for_next_task(wait_time=None): _sleep = (wait_time or random.choice(range(5, 11))) time.sleep(_sleep) @staticmethod def sleep(wait_time=None): sleep_time = (wait_time or 600) time.sleep(sleep_time) def _release_account(self): rows = dict( used=False, update_time=datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S') ) if self.account_id is not None: self._update_data(rows) def _update_data(self, item): """ 更新账号所属的采集数据信息 :param item: 最新数据 """ item['ip'] = get_host_ip() item['update_time'] = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S') self.account_tab.update_one( {'_id': self.account_id}, {'$set': item} ) def update_count(self, number): rows = self.account_tab.find_one({'_id': self.account_id}) records = rows.get('records', {self.today: 0}) '''采集记录历史保存7天''' count = records.get(self.today, 0) count += number self.count = count if len(records) > 7: records.clear() records.setdefault(self.today, count) else: records.update({self.today: count}) rows.update({'records': records}) self._update_data(rows) def __enter__(self): logger.info(f'[开启调度]') '''获取一个闲置时间较久的账号''' rows = self.account_tab.find_one(self.query, sort=[('update_time', 1)]) if rows is not None: self.account_id = rows['_id'] self.user = User(rows['account'], rows['password']) logger.info(f'[启用账号]{self.user.phone}') rows['used'] = True '''初始化采集数据记录''' records = rows.get('records', {self.today: 0}) rows.update({'records': records}) self._update_data(rows) self.total = rows['total'] self.count = records.get(self.today, 0) self.crawl_start = True else: logger.warning(f'[{self.query.get("site")}采集]暂无闲置账号') return self def __exit__(self, exc_type, exc_val, exc_tb): logger.info(f'[关闭调度]') self._release_account() self.crawl_start = False if exc_type is not None: errmsg = traceback.extract_tb(exc_tb) e = JyBasicException( code=10500, reason=str(exc_type), title='未知系统错误' ) self.err_record(e) logger.error(f'错误类型: {exc_type}, 错误内容: {exc_val}, 错误详情: {errmsg}') return True