import datetime import random import time from datetime import date, timedelta import requests from crawler.account import account_record, get_account, release_account from crawler.login import User from utils.databases import mongo_table, int2long, object_id from utils.execptions import ZbYTbCrawlError from utils.log import logger from utils.tools import get_host_ip, wait class Scheduler: def __init__(self, site, crawl_type, **kwargs): self.site = site self.crawl_type = crawl_type self.crawl_start = False self.count = None # 日采集数量 self.total = None # 日采集上限 self.account_id = None self.record_id = None self.user = None self.spider_code = None self.crawl_url = None self.crawl_params = None self.crawl_exception = None self.kwargs = kwargs self.account_tab = mongo_table('py_spider', 'match_account') self.record_tab = mongo_table('py_spider', 'match_account_record') self.crawl_error_tab = mongo_table('py_spider', 'crawl_error') list_attr = dict( name='crawl_list', lock=dict(crawl_list=True), release=dict(crawl_list=False), ) detail_attr = dict( name='crawl_detail', lock=dict(crawl_detail=True), release=dict(crawl_detail=False), ) self._schedule = {'list': list_attr, 'detail': detail_attr} self.account = get_account(self.site, self.crawl_type) def _init(self): self.account_id = self.account['_id'] account_record(self.account_id, self.crawl_type) # 保存使用账号,用于容器自启动归还账号 self.user = User(self.account['account'], self.account['password']) logger.info(f'[启用账号]{self.user.username}') history = self.account_history_crawl_record() self.count = history['count'] # 访问条数 def crawl_counter(self, number: int): """采集计数器""" records = self.record_tab.find_one({'_id': self.record_id}) records['count'] += number self.count = records['count'] self._update_tab(self.record_tab, self.record_id, count=self.count) def query_user(self, account: str): query = {'account': account} item = self.account_tab.find_one(query) if item is None: return None return User(item['account'], item['password']) def err_record(self, err: ZbYTbCrawlError): rows = { 'account': self.user.username if self.user is not None else '', 'spidercode': self.spider_code, 'url': self.crawl_url, 'status_code': err.code, 'reason': err.reason, 'params': getattr(err, 'title', ''), 'crawl_time': int2long(int(time.time())), 'crawl_type': self.crawl_type, } self.crawl_error_tab.insert_one(rows) def _update_tab(self, collection, mid, **update): update['update_time'] = self.current_time collection.update_one({'_id': mid}, {'$set': update}) def account_history_crawl_record(self): """使用账号采集记录""" query = dict( account=self.account['account'], date=self.today, type=self.crawl_type, site=self.site ) item = self.record_tab.find_one(query) if item is None: item = dict( site=self.site, account=self.account['account'], type=self.crawl_type, total=self.account.get('total', 0), # 任务总数默认值:0 count=0, ip=get_host_ip(), date=self.today, update_time=self.current_time, ) result = self.record_tab.insert_one(item) item['_id'] = result.inserted_id self.record_id = item['_id'] return item @staticmethod def wait_for_next_task(interval=None): interval = (interval or random.choice(range(5, 15))) wait(interval) def finished(self, interval=None): logger.info("任务结束") # release_account(self.account_id, self.crawl_type) wait(interval) @property def crawl_task(self): results = {} url = 'http://cc.spdata.jianyu360.com/crawl/zbytb/task/fetch' try: response = requests.get(url, timeout=10) if response.status_code == 200: data = response.json()['data'] if len(data) > 0: results['_id'] = object_id(data['_id']) for key, val in data.items(): if key != '_id': results[key] = val return results except requests.RequestException: return results @property def today(self): return datetime.datetime.today().strftime('%Y-%m-%d') @property def current_time(self): return datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S') @property def yesterday(self): return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d") def __enter__(self): logger.info('[开启调度]') '''获取闲置账号''' if self.account is not None: self._init() self.crawl_start = True else: logger.warning(f'[{self.site}]暂无闲置账号') return self def __exit__(self, exc_type, exc_val, exc_tb): logger.info('[关闭调度]') release_account(self.account_id, self.crawl_type) self.crawl_start = False if exc_type is not None: logger.exception(exc_tb) e = ZbYTbCrawlError( code=10500, reason=str(exc_type), title='未知系统错误' ) self.err_record(e) return True