dongzhaorui 3 years ago
parent
commit
280eb47b22
1 changed files with 135 additions and 159 deletions
  1. 135 159
      zbytb/crawler/spiders/DetailPageSpider.py

+ 135 - 159
zbytb/crawler/spiders/DetailPageSpider.py

@@ -40,26 +40,9 @@ class CrawlDetailPageSpider:
         self.attachment_downloader = AttachmentDownloader()
         self.senior_account = 'runhekeji'
 
-    @staticmethod
-    def select_user(rows: dict, username):
-        """
-        选择用户账号,并在采集内容中添加用户账号
-
-        :param rows: 采集内容
-        :param username: 采集账号
-        :return: 用户账号和账号cookie
-        """
-        account = rows.get('account', username)
-        rows.update({'account': account})
-        return account, load_login_cookies(account)
-
-    @staticmethod
-    def extract_response_content(response):
-        results = re.findall(r'Inner(.*?);Inner', response.text)
-        if len(results) > 0:
-            return results[0][13:-1]
-        else:
-            return ''
+        self.account = None
+        self.cookies = None
+        self.spider_code = None
 
     @staticmethod
     def prepare_url(rows: dict):
@@ -75,27 +58,59 @@ class CrawlDetailPageSpider:
         url = host + '?' + urlencode(params)
         return url
 
-    def handler_error(self, response, reason: str, code: int, rows: dict):
-        logger.error(reason)
-        if code == 10104 and rows.get('account') != self.senior_account:
-            self.set_senior_privilege(rows)
-            return
-        else:
-            self.crawl_tab.update_one(
-                {'_id': rows['_id']},
-                {'$set': {'crawl_status': 'error'}}
-            )
-        response.status_code = code
-        err_msg = response.reason = reason
-        response.request.url = rows['competehref']
-        self.crawl_error(
-            response=response,
-            spider_code=rows['spidercode'],
-            account=rows.get('account'),
-            err_msg=err_msg,
-        )
+    def _update_crawl_task(self, tid, **kwargs):
+        self.crawl_tab.update_one({'_id': tid}, {'$set': kwargs})
+
+    def _lock_task(self, task: dict):
+        update = {'crawl': True}
+        self._update_crawl_task(task['_id'], **update)
+
+    def _release_task(self, task: dict):
+        update = {'crawl': False}
+        self._update_crawl_task(task['_id'], **update)
+
+    def switch_senior_user(self, tid):
+        """
+        切换高级账号
 
-    def process_attachment(self, content: str, rows: dict):
+        :param tid: 采集条目ObjectId
+        """
+        # 需要高级会员才能查询的招标信息,切换高级账号
+        self._update_crawl_task(tid, account=self.senior_account)
+
+    def crawl_error(
+            self,
+            *,
+            spider_code: str,
+            account: str,
+            err_msg='采集失败',
+            response=None,
+            rows=None,
+    ):
+        items = {
+            'account': account,
+            'spidercode': spider_code,
+            'crawl_time': int2long(int(time.time())),
+            'crawl_type': 'detail'
+        }
+        if response is not None:
+            items.update({
+                'url': response.request.url,
+                'status_code': response.status_code,
+                'reason': response.reason,
+                'params': getattr(response.request, 'params', None),
+            })
+        elif rows is not None:
+            items.update({
+                'url': rows['url'],
+                'status_code': rows['status_code'],
+                'reason': rows['reason'],
+                'params': rows['params'],
+            })
+        self.crawl_error_tab.insert_one(items)
+        logger.error(err_msg)
+
+    def get_attachment(self, content: str, rows: dict):
         soup = BeautifulSoup(content, "lxml")
         all_node = soup.findAll("a") or soup.findAll("iframe")
         attachments = {}
@@ -134,8 +149,8 @@ class CrawlDetailPageSpider:
         if attachments:
             rows["projectinfo"] = {"attachments": attachments}
 
-    def process_content(self, content, rows: dict):
-        self.process_attachment(content, rows)
+    def save_data(self, content, rows: dict):
+        self.get_attachment(content, rows)
         rows["contenthtml"] = clean_js(content)
         special = {
             '<iframe[^<>]*>[\s\S]*?</iframe>': ''
@@ -146,165 +161,126 @@ class CrawlDetailPageSpider:
         except CustomCheckError:
             # 页面是一个pdf阅读器, eg: https://www.zbytb.com/s-zhongbiao-10119392.html
             rows["detail"] = "<br/>详细内容请访问原网页!"
-
         rows["comeintime"] = int2long(int(time.time()))
-        '''清除采集字段'''
-        if 'crawl_status' in rows:
-            del rows['crawl_status']
-        del rows['type_code'], rows['account'], rows['crawl'], rows['count']
-        try:
-            self.save_tab.insert_one(rows)
-        except DuplicateKeyError:
-            pass
+        insert = {}
+        for key, val in rows.items():
+            if key not in ['crawl_status', 'account', 'crawl', 'count', '_id']:
+                insert[key] = val
+        self.save_tab.insert_one(insert)
         logger.info("[采集成功]{}-{}".format(rows['title'], rows['publishtime']))
 
-    def set_senior_privilege(self, item: dict):
-        """
-        设置高级账号
-
-        :param item: 采集数据内容
-        """
-        '''需要高级会员才能查询的招标信息,设置高级账号'''
-        self.crawl_tab.update_one(
-            {"_id": item["_id"]},
-            {'$set': {'account': self.senior_account}}
-        )
-        '''采集状态'''
-        self.update_crawl_status(item, False)
-
-    def crawl_error(
-            self,
-            *,
-            spider_code: str,
-            account: str,
-            err_msg='采集失败',
-            response=None,
-            rows=None,
-    ):
-        items = {
-            'account': account,
-            'spidercode': spider_code,
-            'crawl_time': int2long(int(time.time())),
-            'crawl_type': 'detail'
-        }
-        if response is not None:
-            items.update({
-                'url': response.request.url,
-                'status_code': response.status_code,
-                'reason': response.reason,
-                'params': getattr(response.request, 'params', None),
-            })
-        elif rows is not None:
-            items.update({
-                'url': rows['url'],
-                'status_code': rows['status_code'],
-                'reason': rows['reason'],
-                'params': rows['params'],
-            })
-        self.crawl_error_tab.insert_one(items)
-        logger.error(err_msg)
+    def crawl_response(self, response, rows: dict):
+        # print(rows['competehref'])
+        source = re.findall(r'Inner(.*?);Inner', response.text)
+        if len(source) > 0:
+            content = source[0][13:-1]
+        else:
+            content = ''
 
-    def crawl_success(self, response, rows: dict):
-        content = self.extract_response_content(response)
+        counter = 0
         try:
             CheckText(content)
-            self.process_content(content, rows)
-            self.crawl_tab.update_one(
-                {'_id': rows['_id']},
-                {'$set': {'crawl_status': 'finished'}}
-            )
+            self.save_data(content, rows)
+            self._update_crawl_task(rows['_id'], crawl_status='finished')
+            counter = 1
         except (AttachmentNullError, CustomCheckError) as e:
-            self.handler_error(response, e.reason, e.code, rows)
+            if e.code == 10104 and self.account != self.senior_account:
+                self.switch_senior_user(rows)
+            else:
+                self._update_crawl_task(rows['_id'], crawl_status='response_err')
+                response.status_code = e.code
+                err_msg = response.reason = e.reason
+                response.request.url = rows['competehref']
+                self.crawl_error(
+                    response=response,
+                    spider_code=self.spider_code,
+                    account=self.account,
+                    err_msg=err_msg
+                )
         except CustomAccountPrivilegeError:
-            self.set_senior_privilege(rows)
+            self.switch_senior_user(rows)
+        return counter
 
-    @staticmethod
-    def crawl_request(user: User, url: str, headers: dict, cookies: dict):
-        retries = 0
+    def crawl_request(self, url: str, referer: str, user: User):
+        headers = {
+            'Host': 'www.zbytb.com',
+            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
+            'Accept': '*/*',
+            'Referer': 'https://www.zbytb.com/s-zb-20147673.html',
+            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
+        }
+        headers.update({'Referer': referer})
+        retries, max_retries = 0, 3
         while True:
             success, response = http_request_get(
                 url,
                 headers=headers,
-                cookies=cookies,
+                cookies=self.cookies,
                 verify=False,
             )
-            if not success and response.status_code == 10000 and retries < 3:
+            if not success and response.status_code == 10000 and retries < max_retries:
                 retries += 1
             else:
                 retry_login = login_status_check(response)
                 if retry_login:
-                    cookies = login(*user)
                     logger.info(f"重新登录:{user.username}")
+                    self.cookies = login(*user)
                 else:
                     break
-        return success, response
-
-    def update_crawl_status(self, rows: dict, status: bool):
-        self.crawl_tab.update_one(
-            {'_id': rows['_id']},
-            {'$set': {'crawl': status}}
-        )
 
-    def crawl_spider(self, rows: dict, user, account, cookies):
-        headers = {
-            'Host': 'www.zbytb.com',
-            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
-            'Accept': '*/*',
-            'Referer': 'https://www.zbytb.com/s-zb-20147673.html',
-            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
-        }
-        headers.update({'Referer': rows['competehref']})
-        url = self.prepare_url(rows)
-        success, response = self.crawl_request(user, url, headers, cookies)
-        print(rows['competehref'])
         if success:
-            self.crawl_success(response, rows)
-        else:
-            self.crawl_error(
-                spider_code=rows['spidercode'],
-                account=account,
-                response=response
-            )
+            return response
+        # 记录采集异常的数据
+        self.crawl_error(
+            spider_code=self.spider_code,
+            account=self.account,
+            response=response
+        )
+        return None
 
-    def _spider(self, sc: Scheduler):
+    def crawl_spider(self, sc: Scheduler):
         while True:
+            next_task_interval = None
             item = sc.crawl_task
             if len(item) == 0:
                 return False
-            self.update_crawl_status(item, True)
-            sc.spider_code = item['spidercode']
+
+            self._lock_task(item)
+            sc.spider_code = self.spider_code = item['spidercode']
             sc.crawl_url = item['competehref']
+            # 获取闲置用户账号
+            self.account = item.get('account', sc.user.username)
+            self.cookies = load_login_cookies(self.account)
+            user = sc.query_user(self.account)
+            if user is None:
+                return False
             try:
                 CheckTask(item)
-                account, cookies = self.select_user(item, sc.user.username)
-                user = sc.query_user(account)
-                if user is None:
-                    return False
-                self.crawl_spider(item, user, account, cookies)
-                self.update_crawl_status(item, False)
-                sc.crawl_counter(1)
-                sc.wait_for_next_task(10)
+                url = self.prepare_url(item)
+                referer = item['competehref']
+                response = self.crawl_request(url, referer, user)
+                if response is not None:
+                    num = self.crawl_response(response, item)
+                    sc.crawl_counter(num)
+                next_task_interval = 10
             except JyBasicException as e:
                 if e.code == 10105:
-                    '''检查出该异常时,程序会将es查询结果更新采集表'''
-                    self.crawl_tab.update_one(
-                        {"_id": item["_id"]},
-                        {'$set': {'count': item['count']}}
-                    )
+                    # 抛出异常时,将es查询统计结果进行更新
+                    self._update_crawl_task(item["_id"], count=item['count'])
                 else:
                     sc.err_record(e)
-                    self.crawl_tab.update_one(
-                        {"_id": item["_id"]},
-                        {'$set': {'crawl_status': 'error'}}
-                    )
-                self.update_crawl_status(item, False)
+                    self._update_crawl_task(item["_id"], crawl_status='error')
                 sc.crawl_counter(0)
+                next_task_interval = 0.1
+            finally:
+                self._release_task(item)
+                sc.wait_for_next_task(next_task_interval)
 
     def start(self):
         while True:
             with Scheduler(site='中国招标与采购网', crawl_type='detail') as scheduler:
                 if scheduler.crawl_start:
-                    finished = self._spider(scheduler)
+                    finished = self.crawl_spider(scheduler)
                     if not finished:
                         scheduler.wait_for_next_task(2)
                 else: