Procházet zdrojové kódy

元博网 账号调度优化

lizongze před 3 roky
rodič
revize
8066422da5

+ 43 - 21
ybw/crawler/crawl_scheduler.py

@@ -32,7 +32,9 @@ class Scheduler:
         self.spider_code = None
         self.crawl_url = None
 
-        self.account_tab = mongo_table('py_spider', 'match_account')
+        self._headers = {"Authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB"}
+
+        # self.account_tab = mongo_table('py_spider', 'match_account')
         self.record_tab = mongo_table('py_spider', 'match_account_record')
         self.error_tab = mongo_table('py_spider', 'crawl_error')
 
@@ -55,8 +57,45 @@ class Scheduler:
         logger.info(f'[启用账号]{self.user.phone}')
         history = self.account_history_crawl_record()
         self.count = history['count']  # 访问条数
-        lock = self._schedule[self.crawl_type]['lock']
-        self._update_tab(self.account_tab, self.account_id, **lock)
+        self.total = history['total']  # 每日限量
+
+    def get_account(self):
+        url = "http://cc.spdata.jianyu360.com/competing_goods/account/fetch"
+        params = {
+            "site": self.site,
+            "crawl_type": self.crawl_type
+        }
+
+        try:
+            response = requests.get(url,
+                                    headers=self._headers,
+                                    params=params,
+                                    timeout=10)
+            data = response.json()['data']
+        except requests.RequestException:
+            # 网络不通信时,无法获取账号
+            data = None
+        return data
+
+    def _release_account(self):
+        url = "http://cc.spdata.jianyu360.com/competing_goods/account/release"
+        if self.account_id is not None:
+            params = {
+                "uid": self.account_id,
+                "crawl_type": self.crawl_type
+            }
+            while True:
+                try:
+                    response = requests.get(url,
+                                            headers=self._headers,
+                                            params=params,
+                                            timeout=10)
+                    if response.status_code == 200:
+                        logger.debug(f"_release_account >>> {response.json()}")
+                        break
+                except requests.RequestException:
+                    logger.error("网络异常,归还账号失败")
+                    self._wait_schedule(1)
 
     def crawl_counter(self, number: int):
         """采集计数器"""
@@ -78,20 +117,11 @@ class Scheduler:
         }
         self.error_tab.insert_one(rows)
 
-    def _release_account(self):
-        if self.account_id is not None:
-            release = self._schedule[self.crawl_type]['release']
-            self._update_tab(self.account_tab, self.account_id, **release)
 
     def _update_tab(self, collection, mid, **update):
         update['update_time'] = self.current_time
         collection.update_one({'_id': mid}, {'$set': update})
 
-    def get_account(self):
-        """获取账号"""
-        release = self._schedule[self.crawl_type]['release']
-        query = dict(site=self.site, **release)
-        return self.account_tab.find_one(query, sort=[('update_time', 1)])
 
     def change_account(self):
         """更换账号"""
@@ -106,26 +136,18 @@ class Scheduler:
             type=self.crawl_type,
             site=self.site,
         )
-        if self.channel is not None:
-            query['channel'] = self.channel
-
         item = self.record_tab.find_one(query)
         if item is None:
             item = dict(
                 site=self.site,
                 account=self.account['account'],
                 type=self.crawl_type,
+                total=self.account.get('total', 0),  # 任务总数默认值:0
                 count=0,
                 ip=get_host_ip(),
                 date=self.today,
                 update_time=self.current_time,
             )
-
-            if self.crawl_type == 'detail':
-                self.total = item['total'] = self.account['total']  # 访问上限
-            else:
-                item['channel'] = (self.channel or '')
-
             result = self.record_tab.insert_one(item)
             item['_id'] = result.inserted_id
         self.record_id = item['_id']

+ 7 - 1
ybw/crawler/login.py

@@ -12,6 +12,7 @@ from requests import Session
 from requests.utils import dict_from_cookiejar
 
 from config.load import node_module_path
+from utils.execptions import CrawlError
 from utils.log import logger
 
 LOCK = threading.RLock()
@@ -318,7 +319,12 @@ def login_check(account: str = None, refer=None, allow_output_log=True):
 
     ts = int(time.time())
     r, session = login_session_by_cookies(cookies, url, headers, data=payload)
-    member = r.json()
+
+    try:
+        member = r.json()
+    except json.decoder.JSONDecodeError:
+        raise CrawlError(code=10021,reason="系统繁忙,请等待一会儿,自动刷新。")
+
     if allow_output_log:
         logger.info("账号信息:{}", json.dumps(member, indent=4, ensure_ascii=False))
 

+ 4 - 9
ybw/detail_spider.py

@@ -218,8 +218,8 @@ class DetailSpider:
                     self._update_crawl_task(item["_id"], crawl_status='finished')
                     sc.crawl_counter(1)
                 next_task_interval = random.choice(range(3,9))
-            except YbwCrawlError as e:
-                if e.code == 10105:
+            except (YbwCrawlError, Exception) as e:
+                if getattr(e, 'code', None) == 10105:
                     # 抛出异常时,将es查询统计结果进行更新
                     self._update_crawl_task(item["_id"], count=item['count'])
                     logger.info('[重复数据]{}-{}'.format(item['title'], item['publishtime']))
@@ -238,13 +238,8 @@ class DetailSpider:
             with Scheduler(site='元博网', crawl_type='detail') as scheduler:
                 if scheduler.crawl_start:
                     self.user = scheduler.user
-                    finished = self.crawl_spider(scheduler)
-                    if finished:
-                        # 完成采集任务
-                        scheduler.finished()
-                    else:
-                        # 暂无采集任务
-                        scheduler.wait_for_next_task()
+                    self.crawl_spider(scheduler)
+                scheduler.finished(10)
 
 
 if __name__ == '__main__':

+ 1 - 1
zbytb/crawler/spiders/DetailPageSpider.py

@@ -273,7 +273,7 @@ class CrawlDetailPageSpider:
                     sc.crawl_counter(num)
                 next_task_interval = 10
             except (ZbYTbCrawlError, Exception) as e:
-                if e.code == 10105:
+                if getattr(e, 'code', None) == 10105:
                     # 抛出异常时,将es查询统计结果进行更新
                     self._update_crawl_task(item["_id"], count=item['count'])
                 else: