Explorar o código

元博网 - update

dongzhaorui %!s(int64=3) %!d(string=hai) anos
pai
achega
f638b67751
Modificáronse 2 ficheiros con 94 adicións e 90 borrados
  1. 89 86
      ybw/crawler/crawl_scheduler.py
  2. 5 4
      ybw/list_spider.py

+ 89 - 86
ybw/crawler/crawl_scheduler.py

@@ -14,143 +14,136 @@ from utils.tools import get_host_ip
 
 class Scheduler:
 
-    def __init__(self, site, crawl_type, **kwargs):
-        self.site = site
-        self.crawl_type = crawl_type
+    def __init__(self, site, crawl_type, channel=None, **kwargs):
         self.crawl_start = False  # 控制调度的状态
-        self.count = None  # 日采集数量
-        self.total = None  # 日采集上限
+        self.crawl_type = crawl_type
+        if self.crawl_type == 'list' and channel is None:
+            raise TypeError(
+                '{} missing 1 required positional argument: {}'.format(
+                    self.__class__.__name__, 'channel')
+            )
+        self.site = site
+        self.channel = channel
+        self.user = None    # 账户
         self.account_id = None  # 账号id
         self.record_id = None  # 日采集记录id
-        self.user = None    # 账户
+        self.count = None  # 日采集数量
+        self.total = None  # 日采集上限
         self.spider_code = None
         self.crawl_url = None
-        self.crawl_params = None
-        self.crawl_exception = None
-        self.kwargs = kwargs
 
         self.account_tab = mongo_table('py_spider', 'match_account')
         self.record_tab = mongo_table('py_spider', 'match_account_record')
         self.error_tab = mongo_table('py_spider', 'crawl_error')
 
+        list_attr = dict(
+            name='crawl_list',
+            lock=dict(crawl_list=True),
+            release=dict(crawl_list=False),
+        )
+        detail_attr = dict(
+            name='crawl_detail',
+            lock=dict(crawl_detail=True),
+            release=dict(crawl_detail=False),
+        )
+        self._schedule = {'list': list_attr, 'detail': detail_attr}
+        self.account = self.get_account()
+
+    def _init(self):
+        self.account_id = self.account['_id']
+        self.user = User(self.account['account'], self.account['password'])
+        logger.info(f'[启用账号]{self.user.phone}')
+        history = self.account_history_crawl_record()
+        self.count = history['count']  # 访问条数
+        lock = self._schedule[self.crawl_type]['lock']
+        self._update_tab(self.account_tab, self.account_id, **lock)
+
     def crawl_counter(self, number: int):
         """采集计数器"""
         records = self.record_tab.find_one({'_id': self.record_id})
         records['count'] += number
         self.count = records['count']
-        self._update_tab(self.record_tab, self.record_id, records)
+        self._update_tab(self.record_tab, self.record_id, count=self.count)
 
-    def finished(self, execute_next_time=None):
-        logger.info("任务结束")
-        self._release_account()
-        self.sleep(execute_next_time)
-
-    def err_record(self, e: YbwCrawlError):
+    def err_record(self, err: YbwCrawlError):
         rows = {
             'account': self.user.phone if self.user is not None else '',
             'spidercode': self.spider_code,
             'url': self.crawl_url,
-            'status_code': e.code,
-            'reason': e.reason,
-            'params': getattr(e, 'title', ''),
+            'status_code': err.code,
+            'reason': err.reason,
+            'params': getattr(err, 'title', ''),
             'crawl_time': int2long(int(time.time())),
             'crawl_type': self.crawl_type,
         }
         self.error_tab.insert_one(rows)
 
-    def _init_account(self):
-        self.account_id = self.account['_id']
-        self.user = User(self.account['account'], self.account['password'])
-        logger.info(f'[启用账号]{self.user.phone}')
-        '''初始化账号使用记录'''
-        records = self.account_records
-        if self.crawl_type == 'detail':
-            item = {'crawl_detail': True}
-            self.total = records['total']
-            self.count = records['count']
-        else:
-            item = {'crawl_list': True}
-        '''更新账号信息'''
-        self._update_tab(self.account_tab, self.account_id, item)
-
     def _release_account(self):
-        if self.crawl_type == 'detail':
-            rows = dict(crawl_detail=False,)
-        else:
-            rows = dict(crawl_list=False,)
         if self.account_id is not None:
-            self._update_tab(self.account_tab, self.account_id, rows)
+            release = self._schedule[self.crawl_type]['release']
+            self._update_tab(self.account_tab, self.account_id, **release)
 
-    def _update_tab(self, mgo_coll, _id, item):
-        """
-        更新mongo表
+    def _update_tab(self, collection, mid, **update):
+        update['update_time'] = self.current_time
+        collection.update_one({'_id': mid}, {'$set': update})
 
-        :param mgo_coll: mongo表
-        :param _id: mongo_id
-        :param item: 数据
-        """
-        item['update_time'] = self.current_time
-        mgo_coll.update_one({'_id': _id}, {'$set': item})
+    def get_account(self):
+        """获取账号"""
+        release = self._schedule[self.crawl_type]['release']
+        query = dict(site=self.site, **release)
+        return self.account_tab.find_one(query, sort=[('update_time', 1)])
 
     def change_account(self):
+        """更换账号"""
         self._release_account()
-        self._init_account()
-
-    def __enter__(self):
-        logger.info(f'[开启调度]')
-        '''获取闲置账号'''
-        if self.account is not None:
-            self._init_account()
-            self.crawl_start = True
-        else:
-            logger.warning(f'[{self.site}]暂无闲置账号')
-        return self
-
-    @staticmethod
-    def wait_for_next_task(wait_time=None):
-        _sleep = (wait_time or random.choice(range(5, 11)))
-        time.sleep(_sleep)
+        self._init()
 
-    @staticmethod
-    def sleep(wait_time=None):
-        sleep_time = (wait_time or 600)
-        time.sleep(sleep_time)
-
-    @property
-    def account_records(self):
-        """账号使用记录"""
+    def account_history_crawl_record(self):
+        """使用账号采集记录"""
         query = dict(
             account=self.account['account'],
             date=self.today,
             type=self.crawl_type,
-            site=self.site
+            site=self.site,
         )
+        if self.channel is not None:
+            query['channel'] = self.channel
+
         item = self.record_tab.find_one(query)
         if item is None:
             item = dict(
                 site=self.site,
                 account=self.account['account'],
                 type=self.crawl_type,
-                total=self.account['total'],
                 count=0,
                 ip=get_host_ip(),
                 date=self.today,
                 update_time=self.current_time,
             )
+
+            if self.crawl_type == 'detail':
+                self.total = item['total'] = self.account['total']  # 访问上限
+            else:
+                item['channel'] = (self.channel or '')
+
             result = self.record_tab.insert_one(item)
             item['_id'] = result.inserted_id
         self.record_id = item['_id']
         return item
 
-    @property
-    def account(self):
-        """账号"""
-        query = dict(site=self.site)
-        if self.crawl_type == 'detail':
-            query['crawl_detail'] = False
-        else:
-            query['crawl_list'] = False
-        return self.account_tab.find_one(query, sort=[('update_time', 1)])
+    def finished(self, execute_next_time=None):
+        logger.info("任务结束")
+        self._release_account()
+        self._wait_schedule(execute_next_time)
+
+    def wait_for_next_task(self, interval=None):
+        interval = (interval or random.choice(range(5, 11)))
+        self._wait_schedule(interval)
+
+    @staticmethod
+    def _wait_schedule(interval=None):
+        interval = (interval or 600)
+        time.sleep(interval)
 
     @property
     def crawl_task(self):
@@ -173,13 +166,23 @@ class Scheduler:
     def today(self):
         return datetime.datetime.today().strftime('%Y-%m-%d')
 
+    @property
+    def yesterday(self):
+        return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
+
     @property
     def current_time(self):
         return datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
 
-    @property
-    def yesterday(self):
-        return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
+    def __enter__(self):
+        logger.info(f'[开启调度]')
+        '''获取闲置账号'''
+        if self.account is not None:
+            self._init()
+            self.crawl_start = True
+        else:
+            logger.warning(f'[{self.site}]暂无闲置账号')
+        return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         logger.info(f'[关闭调度]')

+ 5 - 4
ybw/list_spider.py

@@ -6,14 +6,15 @@ from urllib.parse import quote
 
 import requests
 from lxml.html import fromstring, HtmlElement
-from utils.tools import sha1
+
 from config.load import crawler_url, region
 from crawler.crawl_scheduler import Scheduler
 from crawler.login import login, load_login_cookies, login_session_check
 from utils.databases import mongo_table, int2long, es_query, redis_client
-from utils.execptions import CheckError, CrawlError, YbwCrawlError
+from utils.execptions import CrawlError, YbwCrawlError
 from utils.log import logger
 from utils.socks5 import Proxy
+from utils.tools import sha1
 
 CrawlMenu = namedtuple('CrawlMenu', ['channel', 'spidercode', 'table_type'])
 
@@ -222,7 +223,6 @@ class ListSpider:
                         page=page
                     )
                     sc.crawl_counter(item_size)
-
                     if item_size < self.page_size:
                         '''当前采集数量小于页面查询数量,不需要访问下一页'''
                         break
@@ -243,7 +243,8 @@ class ListSpider:
 
     def start(self):
         for menu in self.crawl_menus:
-            with Scheduler(site='元博网', crawl_type='list') as scheduler:
+            with Scheduler(site='元博网', crawl_type='list',
+                           channel=menu.channel) as scheduler:
                 if scheduler.crawl_start:
                     self.user = scheduler.user
                     self.crawl_spider(scheduler, menu)