Ver código fonte

中国招标与采购网 - update

dongzhaorui 3 anos atrás
pai
commit
3e724becba

+ 10 - 10
zbytb/crawler/check_utils.py

@@ -2,8 +2,8 @@ import re
 
 from utils.databases import es_query
 from utils.execptions import (
-    CustomAccountPrivilegeError,
-    CustomCheckError
+    AccountError,
+    CheckError
 )
 
 __all__ = ['CheckText', 'CheckTask']
@@ -20,21 +20,21 @@ class CheckContent:
     @staticmethod
     def check_text_length(val: str):
         if len(val) == 0:
-            raise CustomCheckError(code=10101, reason='文本内容为空')
+            raise CheckError(code=10101, reason='文本内容为空')
         elif not re.findall(r'[\u4e00-\u9fa5]', val, re.S):
-            raise CustomCheckError(code=10102, reason='不存在中文字符')
+            raise CheckError(code=10102, reason='不存在中文字符')
 
     @staticmethod
     def check_content(val: str):
         if val.count("部分文件可能不支持在线浏览"):
-            raise CustomCheckError(code=10103, reason='文件不支持在线浏览')
+            raise CheckError(code=10103, reason='文件不支持在线浏览')
 
     @staticmethod
     def check_account_privilege(val: str):
         if val.count("高级会员"):
-            raise CustomAccountPrivilegeError
+            raise AccountError(code=10011, reason='账号权限等级过低')
         elif "本招标项目仅供正式会员查阅" in val:
-            raise CustomAccountPrivilegeError
+            raise AccountError(code=10012, reason='账号无会员访问权限')
 
     def check_sensitive_word(self, val: str):
         total = set()
@@ -44,7 +44,7 @@ class CheckContent:
                 total.add(word)
 
         if len(total) > 0:
-            raise CustomCheckError(code=10104, reason='详情内容包含敏感词')
+            raise CheckError(code=10104, reason='敏感词过滤')
 
     def __check(self, text):
         self.check_sensitive_word(text)
@@ -80,7 +80,7 @@ class CheckPrePareRequest:
         if retrieved_result != 0:
             '''es查询数据结果'''
             rows['count'] = retrieved_result
-            raise CustomCheckError(code=10105, reason='标题内容已存在es')
+            raise CheckError(code=10105, reason='es已收录标题')
 
     def check_crawl_title(self, title: str):
         for keyword in self.crawl_keywords:
@@ -88,7 +88,7 @@ class CheckPrePareRequest:
             if valid_keyword is not None:
                 break
         else:
-            raise CustomCheckError(code=10106, reason='标题未检索到采集关键词', title=title)
+            raise CheckError(code=10106, reason='标题未检索到采集关键词', title=title)
 
     def __check(self, rows: dict):
         title, publish_time = rows['title'], rows['l_np_publishtime']

+ 96 - 75
zbytb/crawler/crawl_scheduler.py

@@ -7,7 +7,7 @@ import requests
 
 from crawler.login import User
 from utils.databases import mongo_table, int2long, object_id
-from utils.execptions import JyBasicException
+from utils.execptions import ZbYTbCrawlError
 from utils.log import logger
 from utils.tools import get_host_ip
 
@@ -29,16 +29,76 @@ class Scheduler:
         self.crawl_exception = None
         self.kwargs = kwargs
 
+        self._headers = {"Authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB"}
+
         self.account_tab = mongo_table('py_spider', 'match_account')
         self.record_tab = mongo_table('py_spider', 'match_account_record')
         self.crawl_error_tab = mongo_table('py_spider', 'crawl_error')
 
+        list_attr = dict(
+            name='crawl_list',
+            lock=dict(crawl_list=True),
+            release=dict(crawl_list=False),
+        )
+        detail_attr = dict(
+            name='crawl_detail',
+            lock=dict(crawl_detail=True),
+            release=dict(crawl_detail=False),
+        )
+        self._schedule = {'list': list_attr, 'detail': detail_attr}
+        self.account = self.get_account()
+
+    def _init(self):
+        self.account_id = self.account['_id']
+        self.user = User(self.account['account'], self.account['password'])
+        logger.info(f'[启用账号]{self.user.username}')
+        history = self.account_history_crawl_record()
+        self.count = history['count']  # 访问条数
+
+    def get_account(self):
+        url = "http://cc.spdata.jianyu360.com/competing_goods/account/fetch"
+        params = {
+            "site": self.site,
+            "crawl_type": self.crawl_type
+        }
+        try:
+            response = requests.get(url,
+                                    headers=self._headers,
+                                    params=params,
+                                    timeout=10)
+            print(response.json())
+            data = response.json()['data']
+        except requests.RequestException:
+            # 网络不通信时,无法获取账号
+            data = None
+        return data
+
+    def _release_account(self):
+        url = "http://cc.spdata.jianyu360.com/competing_goods/account/release"
+        if self.account_id is not None:
+            params = {
+                "uid": self.account_id,
+                "crawl_type": self.crawl_type
+            }
+            while True:
+                try:
+                    response = requests.get(url,
+                                            headers=self._headers,
+                                            params=params,
+                                            timeout=10)
+                    if response.status_code == 200:
+                        logger.debug(f"_release_account >>> {response.json()}")
+                        break
+                except requests.RequestException:
+                    logger.error("网络异常,归还账号失败")
+                    self._wait_schedule(1)
+
     def crawl_counter(self, number: int):
         """采集计数器"""
         records = self.record_tab.find_one({'_id': self.record_id})
         records['count'] += number
         self.count = records['count']
-        self._update_tab(self.record_tab, self.record_id, records)
+        self._update_tab(self.record_tab, self.record_id, count=self.count)
 
     def query_user(self, account: str):
         query = {'account': account}
@@ -47,78 +107,25 @@ class Scheduler:
             return None
         return User(item['account'], item['password'])
 
-    def finished(self, execute_next_time=None):
-        logger.info("任务结束")
-        self._release_account()
-        self.sleep(execute_next_time)
-
-    def err_record(self, e: JyBasicException):
+    def err_record(self, err: ZbYTbCrawlError):
         rows = {
             'account': self.user.username if self.user is not None else '',
             'spidercode': self.spider_code,
             'url': self.crawl_url,
-            'status_code': e.code,
-            'reason': e.reason,
-            'params': getattr(e, 'title', ''),
+            'status_code': err.code,
+            'reason': err.reason,
+            'params': getattr(err, 'title', ''),
             'crawl_time': int2long(int(time.time())),
             'crawl_type': self.crawl_type,
         }
         self.crawl_error_tab.insert_one(rows)
 
-    def _update_tab(self, mgo_coll, _id, item):
-        """
-        更新mongo表
-
-        :param mgo_coll: mongo表
-        :param _id: mongo_id
-        :param item: 数据
-        """
-        item['update_time'] = self.current_time
-        mgo_coll.update_one({'_id': _id}, {'$set': item})
-
-    def _release_account(self):
-        if self.crawl_type == 'detail':
-            rows = dict(crawl_detail=False,)
-        else:
-            rows = dict(crawl_list=False,)
-        if self.account_id is not None:
-            self._update_tab(self.account_tab, self.account_id, rows)
-
-    def __enter__(self):
-        logger.info(f'[开启调度]')
-        '''获取闲置账号'''
-        if self.account is not None:
-            self.account_id = self.account['_id']
-            self.user = User(self.account['account'], self.account['password'])
-            logger.info(f'[启用账号]{self.user.username}')
-            '''初始化记录表'''
-            records = self.account_records
-            if self.crawl_type == 'detail':
-                item = {'crawl_detail': True}
-                self.total = records['total']
-                self.count = records['count']
-            else:
-                item = {'crawl_list': True}
-            '''初始化采集账号记录'''
-            self._update_tab(self.account_tab, self.account_id, item)
-            self.crawl_start = True
-        else:
-            logger.warning(f'[{self.site}]暂无闲置账号')
-        return self
-
-    @staticmethod
-    def wait_for_next_task(wait_time=None):
-        _sleep = (wait_time or random.choice(range(5, 15)))
-        time.sleep(_sleep)
-
-    @staticmethod
-    def sleep(wait_time=None):
-        sleep_time = (wait_time or 600)
-        time.sleep(sleep_time)
+    def _update_tab(self, collection, mid, **update):
+        update['update_time'] = self.current_time
+        collection.update_one({'_id': mid}, {'$set': update})
 
-    @property
-    def account_records(self):
-        """账号使用记录"""
+    def account_history_crawl_record(self):
+        """使用账号采集记录"""
         query = dict(
             account=self.account['account'],
             date=self.today,
@@ -142,15 +149,19 @@ class Scheduler:
         self.record_id = item['_id']
         return item
 
-    @property
-    def account(self):
-        """账号"""
-        query = dict(site=self.site)
-        if self.crawl_type == 'detail':
-            query['crawl_detail'] = False
-        else:
-            query['crawl_list'] = False
-        return self.account_tab.find_one(query, sort=[('update_time', 1)])
+    def wait_for_next_task(self, interval=None):
+        interval = (interval or random.choice(range(5, 15)))
+        self._wait_schedule(interval)
+
+    def finished(self, execute_next_time=None):
+        logger.info("任务结束")
+        self._release_account()
+        self._wait_schedule(execute_next_time)
+
+    @staticmethod
+    def _wait_schedule(interval=None):
+        _interval = (interval or 600)
+        time.sleep(_interval)
 
     @property
     def crawl_task(self):
@@ -181,14 +192,24 @@ class Scheduler:
     def yesterday(self):
         return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
 
+    def __enter__(self):
+        logger.info('[开启调度]')
+        '''获取闲置账号'''
+        if self.account is not None:
+            self._init()
+            self.crawl_start = True
+        else:
+            logger.warning(f'[{self.site}]暂无闲置账号')
+        return self
+
     def __exit__(self, exc_type, exc_val, exc_tb):
-        logger.info(f'[关闭调度]')
+        logger.info('[关闭调度]')
         self._release_account()
         self.crawl_start = False
 
         if exc_type is not None:
             logger.exception(exc_tb)
-            e = JyBasicException(
+            e = ZbYTbCrawlError(
                 code=10500,
                 reason=str(exc_type),
                 title='未知系统错误'

+ 43 - 37
zbytb/crawler/spiders/DetailPageSpider.py

@@ -4,7 +4,6 @@ import time
 from urllib.parse import urlencode, urlparse
 
 from bs4 import BeautifulSoup
-from pymongo.errors import DuplicateKeyError
 
 from crawler.check_utils import CheckText, CheckTask
 from crawler.clean_html import cleaner, clean_js
@@ -18,9 +17,10 @@ from utils.attachment import (
 )
 from utils.databases import mongo_table, int2long
 from utils.execptions import (
-    CustomAccountPrivilegeError,
-    AttachmentNullError,
-    CustomCheckError, JyBasicException
+    AccountError,
+    AttachmentError,
+    CheckError,
+    ZbYTbCrawlError
 )
 from utils.log import logger
 
@@ -75,7 +75,7 @@ class CrawlDetailPageSpider:
 
         :param tid: 采集条目ObjectId
         """
-        # 需要高级会员才能查询的招标信息,切换高级账号
+        # 需要高级会员才能查询的招标信息,指定使用高级账号
         self._update_crawl_task(tid, account=self.senior_account)
 
     def crawl_error(
@@ -110,11 +110,12 @@ class CrawlDetailPageSpider:
         self.crawl_error_tab.insert_one(items)
         logger.error(err_msg)
 
-    def get_attachment(self, content: str, rows: dict):
+    def download_attachment(self, content: str, rows: dict):
+        logger.info('>>> 下载附件')
+        index = 0
+        attachments = {}
         soup = BeautifulSoup(content, "lxml")
         all_node = soup.findAll("a") or soup.findAll("iframe")
-        attachments = {}
-        index = 0
         for node in all_node:
             file_name, file_type = (node.string or node.text), None
             file_path = node.attrs.get("href", "") or node.attrs.get("src", "")
@@ -128,29 +129,33 @@ class CrawlDetailPageSpider:
                              or extract_file_type(file_path))
 
             # 抽取文件名称
-            parser = urlparse(file_path)
-            if parser.scheme in ['https', 'http'] and file_type is not None:
-                if not file_name:
-                    name = extract_file_name_by_href(file_path, file_type)
-                    if name is not None:
-                        file_name = name
-                    else:
-                        file_name = f"{rows['title']}_{index}"
+            try:
+                parser = urlparse(file_path)
+            except ValueError:
+                pass
+            else:
+                if parser.scheme in ['https', 'http'] and file_type is not None:
+                    if not file_name:
+                        name = extract_file_name_by_href(file_path, file_type)
+                        if name is not None:
+                            file_name = name
+                        else:
+                            file_name = f"{rows['title']}_{index}"
 
-                attachment = self.attachment_downloader.download(
-                    file_name=file_name,
-                    file_type=file_type,
-                    download_url=file_path,
-                )
-                if len(attachment) > 0:
-                    attachments[str(index + 1)] = attachment
-                    index += 1
+                    attachment = self.attachment_downloader.download(
+                        file_name=file_name,
+                        file_type=file_type,
+                        download_url=file_path,
+                    )
+                    if len(attachment) > 0:
+                        attachments[str(index + 1)] = attachment
+                        index += 1
 
-        if attachments:
+        if len(attachments) > 0:
             rows["projectinfo"] = {"attachments": attachments}
 
     def save_data(self, content, rows: dict):
-        self.get_attachment(content, rows)
+        logger.info('>>> 保存数据')
         rows["contenthtml"] = clean_js(content)
         special = {
             '<iframe[^<>]*>[\s\S]*?</iframe>': ''
@@ -158,7 +163,7 @@ class CrawlDetailPageSpider:
         rows["detail"] = cleaner(content, special=special)
         try:
             CheckText(rows["detail"])
-        except CustomCheckError:
+        except CheckError:
             # 页面是一个pdf阅读器, eg: https://www.zbytb.com/s-zhongbiao-10119392.html
             rows["detail"] = "<br/>详细内容请访问原网页!"
         rows["comeintime"] = int2long(int(time.time()))
@@ -170,7 +175,7 @@ class CrawlDetailPageSpider:
         logger.info("[采集成功]{}-{}".format(rows['title'], rows['publishtime']))
 
     def crawl_response(self, response, rows: dict):
-        # print(rows['competehref'])
+        logger.info('>>> 采集响应')
         source = re.findall(r'Inner(.*?);Inner', response.text)
         if len(source) > 0:
             content = source[0][13:-1]
@@ -180,10 +185,11 @@ class CrawlDetailPageSpider:
         counter = 0
         try:
             CheckText(content)
+            self.download_attachment(content, rows)
             self.save_data(content, rows)
             self._update_crawl_task(rows['_id'], crawl_status='finished')
             counter = 1
-        except (AttachmentNullError, CustomCheckError) as e:
+        except (AttachmentError, CheckError) as e:
             if e.code == 10104 and self.account != self.senior_account:
                 self.switch_senior_user(rows)
             else:
@@ -197,11 +203,12 @@ class CrawlDetailPageSpider:
                     account=self.account,
                     err_msg=err_msg
                 )
-        except CustomAccountPrivilegeError:
+        except AccountError:
             self.switch_senior_user(rows)
         return counter
 
     def crawl_request(self, url: str, referer: str, user: User):
+        logger.info('>>> 采集请求')
         headers = {
             'Host': 'www.zbytb.com',
             'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
@@ -248,12 +255,14 @@ class CrawlDetailPageSpider:
             self._lock_task(item)
             sc.spider_code = self.spider_code = item['spidercode']
             sc.crawl_url = item['competehref']
-            # 获取闲置用户账号
+
+            # 分配账号和账号cookie
             self.account = item.get('account', sc.user.username)
             self.cookies = load_login_cookies(self.account)
             user = sc.query_user(self.account)
             if user is None:
                 return False
+
             try:
                 CheckTask(item)
                 url = self.prepare_url(item)
@@ -263,7 +272,7 @@ class CrawlDetailPageSpider:
                     num = self.crawl_response(response, item)
                     sc.crawl_counter(num)
                 next_task_interval = 10
-            except JyBasicException as e:
+            except (ZbYTbCrawlError, Exception) as e:
                 if e.code == 10105:
                     # 抛出异常时,将es查询统计结果进行更新
                     self._update_crawl_task(item["_id"], count=item['count'])
@@ -280,8 +289,5 @@ class CrawlDetailPageSpider:
         while True:
             with Scheduler(site='中国招标与采购网', crawl_type='detail') as scheduler:
                 if scheduler.crawl_start:
-                    finished = self.crawl_spider(scheduler)
-                    if not finished:
-                        scheduler.wait_for_next_task(2)
-                else:
-                    scheduler.wait_for_next_task(60)
+                    self.crawl_spider(scheduler)
+                scheduler.finished(10)