123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- import datetime
- import random
- import time
- import traceback
- from datetime import date, timedelta
- import requests
- from crawler.login import User
- from utils.databases import mongo_table, int2long, object_id
- from utils.execptions import JyBasicException
- from utils.log import logger
- from utils.tools import get_host_ip
- class Scheduler:
- def __init__(self, query: dict):
- self.query = query
- self.account_tab = mongo_table('py_spider', 'match_account')
- self.error_tab = mongo_table('py_spider', 'crawl_error')
- self.crawl_start = False # 控制调度的状态
- self.count = None # 日采集数量
- self.total = None # 日采集上限
- self.account_id = None
- self.user = None
- self.spider_code = None
- self.crawl_url = None
- self.crawl_params = None
- self.crawl_exception = None
- self.crawl_type = None
- @property
- def today(self):
- return datetime.datetime.today().strftime('%Y-%m-%d')
- @property
- def yesterday(self):
- return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
- @property
- def crawl_task(self):
- results = {}
- url = 'http://cc.spdata.jianyu360.com/schedule/crawl_task/ybw_scheduler'
- # url = 'http://127.0.0.1:1405/schedule/crawl_task/ybw_scheduler'
- try:
- response = requests.get(url, timeout=10)
- if response.status_code == 200:
- data = response.json()['data']
- if len(data) > 0:
- results['_id'] = object_id(data['_id'])
- for key, val in data.items():
- if key != '_id':
- results[key] = val
- return results
- except requests.RequestException:
- return results
- def finished(self, execute_next_time=None):
- logger.info("任务结束")
- self._release_account()
- self.sleep(execute_next_time)
- def err_record(self, e: JyBasicException):
- rows = {
- 'account': self.user.phone if self.user is not None else '',
- 'spidercode': self.spider_code,
- 'url': self.crawl_url,
- 'status_code': e.code,
- 'reason': e.reason,
- 'params': getattr(e, 'title', ''),
- 'crawl_time': int2long(int(time.time())),
- 'crawl_type': self.crawl_type,
- }
- self.error_tab.insert_one(rows)
- @staticmethod
- def wait_for_next_task(wait_time=None):
- _sleep = (wait_time or random.choice(range(5, 11)))
- time.sleep(_sleep)
- @staticmethod
- def sleep(wait_time=None):
- sleep_time = (wait_time or 600)
- time.sleep(sleep_time)
- def _release_account(self):
- rows = dict(
- used=False,
- update_time=datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
- )
- if self.account_id is not None:
- self._update_data(rows)
- def _update_data(self, item):
- """
- 更新账号所属的采集数据信息
- :param item: 最新数据
- """
- item['ip'] = get_host_ip()
- item['update_time'] = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
- self.account_tab.update_one(
- {'_id': self.account_id},
- {'$set': item}
- )
- def update_count(self, number):
- rows = self.account_tab.find_one({'_id': self.account_id})
- records = rows.get('records', {self.today: 0})
- '''采集记录历史保存7天'''
- count = records.get(self.today, 0)
- count += number
- self.count = count
- if len(records) > 7:
- records.clear()
- records.setdefault(self.today, count)
- else:
- records.update({self.today: count})
- rows.update({'records': records})
- self._update_data(rows)
- def __enter__(self):
- logger.info(f'[开启调度]')
- '''获取一个闲置时间较久的账号'''
- rows = self.account_tab.find_one(self.query, sort=[('update_time', 1)])
- if rows is not None:
- self.account_id = rows['_id']
- self.user = User(rows['account'], rows['password'])
- logger.info(f'[启用账号]{self.user.phone}')
- rows['used'] = True
- '''初始化采集数据记录'''
- records = rows.get('records', {self.today: 0})
- rows.update({'records': records})
- self._update_data(rows)
- self.total = rows['total']
- self.count = records.get(self.today, 0)
- self.crawl_start = True
- else:
- logger.warning(f'[{self.query.get("site")}采集]暂无闲置账号')
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- logger.info(f'[关闭调度]')
- self._release_account()
- self.crawl_start = False
- if exc_type is not None:
- errmsg = traceback.extract_tb(exc_tb)
- e = JyBasicException(
- code=10500,
- reason=str(exc_type),
- title='未知系统错误'
- )
- self.err_record(e)
- logger.error(f'错误类型: {exc_type}, 错误内容: {exc_val}, 错误详情: {errmsg}')
- return True
|