import datetime import random import time import traceback from datetime import date, timedelta import requests from crawler.login import User from utils.databases import mongo_table, int2long, object_id from utils.execptions import JyBasicException from utils.log import logger from utils.tools import get_host_ip class Scheduler: def __init__(self, site, crawl_type, **kwargs): self.site = site self.crawl_type = crawl_type self.crawl_start = False # 控制调度的状态 self.count = None # 日采集数量 self.total = None # 日采集上限 self.account_id = None # 账号id self.record_id = None # 日采集记录id self.user = None # 账户 self.spider_code = None self.crawl_url = None self.crawl_params = None self.crawl_exception = None self.kwargs = kwargs self.account_tab = mongo_table('py_spider', 'match_account') self.record_tab = mongo_table('py_spider', 'match_account_record') self.error_tab = mongo_table('py_spider', 'crawl_error') def crawl_counter(self, number: int): """采集计数器""" records = self.record_tab.find_one({'_id': self.record_id}) records['count'] += number self.count = records['count'] self._update_tab(self.record_tab, self.record_id, records) def finished(self, execute_next_time=None): logger.info("任务结束") self._release_account() self.sleep(execute_next_time) def err_record(self, e: JyBasicException): rows = { 'account': self.user.phone if self.user is not None else '', 'spidercode': self.spider_code, 'url': self.crawl_url, 'status_code': e.code, 'reason': e.reason, 'params': getattr(e, 'title', ''), 'crawl_time': int2long(int(time.time())), 'crawl_type': self.crawl_type, } self.error_tab.insert_one(rows) def _release_account(self): if self.crawl_type == 'detail': rows = dict(crawl_detail=False,) else: rows = dict(crawl_list=False,) if self.account_id is not None: self._update_tab(self.account_tab, self.account_id, rows) def _update_tab(self, mgo_coll, _id, item): """ 更新mongo表 :param mgo_coll: mongo表 :param _id: mongo_id :param item: 数据 """ item['update_time'] = self.current_time mgo_coll.update_one({'_id': _id}, {'$set': item}) def __enter__(self): logger.info(f'[开启调度]') '''获取闲置账号''' if self.account is not None: self.account_id = self.account['_id'] self.user = User(self.account['account'], self.account['password']) logger.info(f'[启用账号]{self.user.phone}') '''初始化记录表''' records = self.account_records if self.crawl_type == 'detail': item = {'crawl_detail': True} self.total = records['total'] self.count = records['count'] else: item = {'crawl_list': True} '''初始化采集账号记录''' self._update_tab(self.account_tab, self.account_id, item) self.crawl_start = True else: logger.warning(f'[{self.site}]暂无闲置账号') return self @staticmethod def wait_for_next_task(wait_time=None): _sleep = (wait_time or random.choice(range(5, 11))) time.sleep(_sleep) @staticmethod def sleep(wait_time=None): sleep_time = (wait_time or 600) time.sleep(sleep_time) @property def account_records(self): """账号使用记录""" query = dict( account=self.account['account'], date=self.today, type=self.crawl_type, site=self.site ) item = self.record_tab.find_one(query) if item is None: item = dict( site=self.site, account=self.account['account'], type=self.crawl_type, total=self.account['total'], count=0, ip=get_host_ip(), date=self.today, update_time=self.current_time, ) result = self.record_tab.insert_one(item) item['_id'] = result.inserted_id self.record_id = item['_id'] return item @property def account(self): """账号""" query = dict(site=self.site) if self.crawl_type == 'detail': query['crawl_detail'] = False else: query['crawl_list'] = False return self.account_tab.find_one(query, sort=[('update_time', 1)]) @property def crawl_task(self): results = {} url = 'http://cc.spdata.jianyu360.com/crawl/ybw/task/fetch' try: response = requests.get(url, timeout=10) if response.status_code == 200: data = response.json()['data'] if len(data) > 0: results['_id'] = object_id(data['_id']) for key, val in data.items(): if key != '_id': results[key] = val return results except requests.RequestException: return results @property def today(self): return datetime.datetime.today().strftime('%Y-%m-%d') @property def current_time(self): return datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S') @property def yesterday(self): return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d") def __exit__(self, exc_type, exc_val, exc_tb): logger.info(f'[关闭调度]') self._release_account() self.crawl_start = False if exc_type is not None: errmsg = traceback.extract_tb(exc_tb) e = JyBasicException( code=10500, reason=str(exc_type), title='未知系统错误' ) self.err_record(e) logger.error(f'错误类型: {exc_type}, 错误内容: {exc_val}, 错误详情: {errmsg}') return True