123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- import datetime
- import random
- import time
- from datetime import date, timedelta
- import requests
- from crawler.login import User
- from utils.databases import mongo_table, int2long, object_id
- from utils.execptions import ZbYTbCrawlError
- from utils.log import logger
- from utils.tools import get_host_ip
- class Scheduler:
- def __init__(self, site, crawl_type, **kwargs):
- self.site = site
- self.crawl_type = crawl_type
- self.crawl_start = False
- self.count = None # 日采集数量
- self.total = None # 日采集上限
- self.account_id = None
- self.record_id = None
- self.user = None
- self.spider_code = None
- self.crawl_url = None
- self.crawl_params = None
- self.crawl_exception = None
- self.kwargs = kwargs
- self._headers = {"Authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB"}
- self.account_tab = mongo_table('py_spider', 'match_account')
- self.record_tab = mongo_table('py_spider', 'match_account_record')
- self.crawl_error_tab = mongo_table('py_spider', 'crawl_error')
- list_attr = dict(
- name='crawl_list',
- lock=dict(crawl_list=True),
- release=dict(crawl_list=False),
- )
- detail_attr = dict(
- name='crawl_detail',
- lock=dict(crawl_detail=True),
- release=dict(crawl_detail=False),
- )
- self._schedule = {'list': list_attr, 'detail': detail_attr}
- self.account = self.get_account()
- def _init(self):
- self.account_id = self.account['_id']
- self.user = User(self.account['account'], self.account['password'])
- logger.info(f'[启用账号]{self.user.username}')
- history = self.account_history_crawl_record()
- self.count = history['count'] # 访问条数
- def get_account(self):
- url = "http://cc.spdata.jianyu360.com/competing_goods/account/fetch"
- params = {
- "site": self.site,
- "crawl_type": self.crawl_type
- }
- try:
- response = requests.get(url,
- headers=self._headers,
- params=params,
- timeout=10)
- print(response.json())
- data = response.json()['data']
- except requests.RequestException:
- # 网络不通信时,无法获取账号
- data = None
- return data
- def _release_account(self):
- url = "http://cc.spdata.jianyu360.com/competing_goods/account/release"
- if self.account_id is not None:
- params = {
- "uid": self.account_id,
- "crawl_type": self.crawl_type
- }
- while True:
- try:
- response = requests.get(url,
- headers=self._headers,
- params=params,
- timeout=10)
- if response.status_code == 200:
- logger.debug(f"_release_account >>> {response.json()}")
- break
- except requests.RequestException:
- logger.error("网络异常,归还账号失败")
- self._wait_schedule(1)
- def crawl_counter(self, number: int):
- """采集计数器"""
- records = self.record_tab.find_one({'_id': self.record_id})
- records['count'] += number
- self.count = records['count']
- self._update_tab(self.record_tab, self.record_id, count=self.count)
- def query_user(self, account: str):
- query = {'account': account}
- item = self.account_tab.find_one(query)
- if item is None:
- return None
- return User(item['account'], item['password'])
- def err_record(self, err: ZbYTbCrawlError):
- rows = {
- 'account': self.user.username if self.user is not None else '',
- 'spidercode': self.spider_code,
- 'url': self.crawl_url,
- 'status_code': err.code,
- 'reason': err.reason,
- 'params': getattr(err, 'title', ''),
- 'crawl_time': int2long(int(time.time())),
- 'crawl_type': self.crawl_type,
- }
- self.crawl_error_tab.insert_one(rows)
- def _update_tab(self, collection, mid, **update):
- update['update_time'] = self.current_time
- collection.update_one({'_id': mid}, {'$set': update})
- def account_history_crawl_record(self):
- """使用账号采集记录"""
- query = dict(
- account=self.account['account'],
- date=self.today,
- type=self.crawl_type,
- site=self.site
- )
- item = self.record_tab.find_one(query)
- if item is None:
- item = dict(
- site=self.site,
- account=self.account['account'],
- type=self.crawl_type,
- total=self.account.get('total', 0), # 任务总数默认值:0
- count=0,
- ip=get_host_ip(),
- date=self.today,
- update_time=self.current_time,
- )
- result = self.record_tab.insert_one(item)
- item['_id'] = result.inserted_id
- self.record_id = item['_id']
- return item
- def wait_for_next_task(self, interval=None):
- interval = (interval or random.choice(range(5, 15)))
- self._wait_schedule(interval)
- def finished(self, execute_next_time=None):
- logger.info("任务结束")
- self._release_account()
- self._wait_schedule(execute_next_time)
- @staticmethod
- def _wait_schedule(interval=None):
- _interval = (interval or 600)
- time.sleep(_interval)
- @property
- def crawl_task(self):
- results = {}
- url = 'http://cc.spdata.jianyu360.com/crawl/zbytb/task/fetch'
- try:
- response = requests.get(url, timeout=10)
- if response.status_code == 200:
- data = response.json()['data']
- if len(data) > 0:
- results['_id'] = object_id(data['_id'])
- for key, val in data.items():
- if key != '_id':
- results[key] = val
- return results
- except requests.RequestException:
- return results
- @property
- def today(self):
- return datetime.datetime.today().strftime('%Y-%m-%d')
- @property
- def current_time(self):
- return datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
- @property
- def yesterday(self):
- return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
- def __enter__(self):
- logger.info('[开启调度]')
- '''获取闲置账号'''
- if self.account is not None:
- self._init()
- self.crawl_start = True
- else:
- logger.warning(f'[{self.site}]暂无闲置账号')
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- logger.info('[关闭调度]')
- self._release_account()
- self.crawl_start = False
- if exc_type is not None:
- logger.exception(exc_tb)
- e = ZbYTbCrawlError(
- code=10500,
- reason=str(exc_type),
- title='未知系统错误'
- )
- self.err_record(e)
- return True
|