123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- import random
- import re
- import time
- from urllib.parse import urlencode, urlparse
- from bs4 import BeautifulSoup
- from parsel import Selector
- from crawler.check_utils import CheckText, CheckTask
- from crawler.clean_html import cleaner, clean_js
- from crawler.crawl_scheduler import Scheduler
- from crawler.defaults import http_request_get
- from crawler.login import load_login_cookies, login, User, login_status_check
- from utils.attachment import AttachmentDownloader
- from utils.clean_file import (
- extract_file_type,
- extract_file_name_by_href
- )
- from utils.databases import mongo_table, int2long
- from utils.execptions import (
- AccountError,
- AttachmentError,
- CheckError,
- ZbYTbCrawlError
- )
- from utils.log import logger
- class CrawlDetailPageSpider:
- def __init__(self, db: str, crawl_tab: str, save_tab: str, error_tab: str):
- self.crawl_tab = mongo_table(db, crawl_tab)
- self.save_tab = mongo_table(db, save_tab)
- self.crawl_error_tab = mongo_table(db, error_tab)
- self.attachment_downloader = AttachmentDownloader()
- self.senior_account = 'runhekeji'
- self.account = None
- self.cookies = None
- self.spider_code = None
- @staticmethod
- def prepare_url(rows: dict):
- host = "https://www.zbytb.com/api/task.js.php"
- params = {
- "moduleid": rows["type_code"],
- "html": "show",
- "itemid": re.findall(r"\d+\.?\d*", rows["competehref"])[0][:-1],
- "page": "1",
- "es": "",
- "refresh": "{}.js".format(random.random())
- }
- url = host + '?' + urlencode(params)
- return url
- def _update_crawl_task(self, tid, **kwargs):
- self.crawl_tab.update_one({'_id': tid}, {'$set': kwargs})
- def _lock_task(self, task: dict):
- update = {'crawl': True}
- self._update_crawl_task(task['_id'], **update)
- def _release_task(self, task: dict):
- update = {'crawl': False}
- self._update_crawl_task(task['_id'], **update)
- def switch_senior_user(self, tid):
- """
- 切换高级账号
- :param tid: 采集条目ObjectId
- """
- # 需要高级会员才能查询的招标信息,指定使用高级账号
- self._update_crawl_task(tid, account=self.senior_account)
- def crawl_error(
- self,
- *,
- spider_code: str,
- account: str,
- err_msg='采集失败',
- response=None,
- rows=None
- ):
- items = {
- 'account': account,
- 'spidercode': spider_code,
- 'crawl_time': int2long(int(time.time())),
- 'crawl_type': 'detail'
- }
- if response is not None:
- items.update({
- 'url': response.request.url,
- 'status_code': response.status_code,
- 'reason': response.reason,
- 'params': getattr(response.request, 'params', None),
- })
- elif rows is not None:
- items.update({
- 'url': rows['url'],
- 'status_code': rows['status_code'],
- 'reason': rows['reason'],
- 'params': rows['params'],
- })
- self.crawl_error_tab.insert_one(items)
- logger.error(err_msg)
- def download_attachment(self, content: str, rows: dict):
- soup = BeautifulSoup(content, "lxml")
- attachments = {}
- nums = 0
- nodes = soup.findAll("a") or soup.findAll("iframe")
- for node in nodes:
- file_name, file_type = (node.string or node.text), None
- file_path = node.attrs.get("href", "") or node.attrs.get("src", "")
- # 附件可能包含在一个iframe中
- _id = node.attrs.get('id')
- if _id == 'pdfContainer':
- file_type = 'pdf'
- # 抽取文件类型
- if file_type is None:
- file_type = (extract_file_type(file_name) or extract_file_type(file_path))
- # 抽取文件名称
- try:
- parser = urlparse(file_path)
- if parser.scheme in ['https', 'http'] and file_type is not None:
- if not file_name:
- name = extract_file_name_by_href(file_path, file_type)
- if name is not None:
- file_name = name
- else:
- file_name = f"{rows['title']}_{nums}"
- attachment = self.attachment_downloader.download(
- file_name=file_name,
- file_type=file_type,
- download_url=file_path,
- )
- if len(attachment) > 0:
- attachments[str(len(attachments) + 1)] = attachment
- nums += 1
- except ValueError:
- pass
- file_url = soup.findAll('pdfpath')
- if file_url:
- file_url = list(file_url[0].stripped_strings)[0]
- file_type = extract_file_type(file_url)
- file_name = rows['title']
- if file_type:
- attachment = self.attachment_downloader.download(
- file_name=file_name,
- file_type=file_type,
- download_url=file_url,
- )
- if len(attachment) > 0:
- attachments[str(len(attachments) + 1)] = attachment
- if len(attachments) > 0:
- rows["projectinfo"] = {"attachments": attachments}
- def save_data(self, content, rows: dict):
- rows["contenthtml"] = clean_js(content)
- special = {
- '<iframe[^<>]*>[\s\S]*?</iframe>': ''
- }
- rows["detail"] = cleaner(content, special=special)
- try:
- CheckText(rows["detail"])
- except CheckError:
- # 页面是一个pdf阅读器, eg: https://www.zbytb.com/s-zhongbiao-10119392.html
- rows["detail"] = "<br/>详细内容请访问原网页!"
- rows["comeintime"] = int2long(int(time.time()))
- insert = {}
- for key, val in rows.items():
- if key not in ['crawl_status', 'account', 'crawl', 'count', '_id']:
- insert[key] = val
- self.save_tab.insert_one(insert)
- logger.info("[采集成功]{}-{}".format(rows['title'], rows['publishtime']))
- def crawl_response(self, response, rows: dict):
- source = re.findall(r'Inner(.*?);Inner', response.text)
- if len(source) > 0:
- content = source[0][13:-1]
- else:
- root = Selector(text=response.text)
- content = root.xpath('//div[@class="conent-box"]').extract_first()
- if content:
- clean_features = [
- '//div[@class="conent-box"]/div[@class="xgxm"]',
- '//div[@class="content-user"]'
- ]
- for feature in clean_features:
- clean_html = root.xpath(feature).extract_first()
- if clean_html is not None:
- content = content.replace(clean_html, '')
- else:
- content = ''
- counter = 0
- try:
- CheckText(content)
- self.download_attachment(content, rows)
- self.save_data(content, rows)
- self._update_crawl_task(rows['_id'], crawl_status='finished')
- counter = 1
- except (AttachmentError, CheckError) as e:
- if e.code == 10104 and self.account != self.senior_account:
- self.switch_senior_user(rows)
- else:
- self._update_crawl_task(rows['_id'], crawl_status='response_err')
- response.status_code = e.code
- err_msg = response.reason = e.reason
- response.request.url = rows['competehref']
- self.crawl_error(
- response=response,
- spider_code=self.spider_code,
- account=self.account,
- err_msg=err_msg
- )
- except AccountError:
- self.switch_senior_user(rows)
- return counter
- def crawl_request(self, url: str, referer: str, user: User):
- headers = {
- 'Host': 'www.zbytb.com',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
- 'Accept': '*/*',
- 'Referer': 'https://www.zbytb.com/s-zb-20147673.html',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- }
- headers.update({'Referer': referer})
- retries, max_retries = 0, 3
- while True:
- success, response = http_request_get(
- url,
- login=True,
- headers=headers,
- cookies=self.cookies,
- verify=False,
- )
- if not success and response.status_code == 10000 and retries < max_retries:
- retries += 1
- else:
- retry_login = login_status_check(response)
- if retry_login:
- logger.info(f"重新登录:{user.username}")
- self.cookies = login(*user)
- else:
- break
- if success:
- return response
- # 记录采集异常的数据
- self.crawl_error(
- spider_code=self.spider_code,
- account=self.account,
- response=response
- )
- return None
- def crawl_spider(self, sc: Scheduler):
- while True:
- next_task_interval = None
- item = sc.crawl_task
- if len(item) == 0:
- return False
- logger.info(f">>> {item['title']} - {item['competehref']}")
- self._lock_task(item)
- sc.spider_code = self.spider_code = item['spidercode']
- sc.crawl_url = item['competehref']
- # 分配账号和账号cookie
- self.account = item.get('account', sc.user.username)
- self.cookies = load_login_cookies(self.account)
- user = sc.query_user(self.account)
- if user is None:
- return False
- try:
- CheckTask(item)
- url = self.prepare_url(item)
- referer = item['competehref']
- response = self.crawl_request(url, referer, user)
- if response is not None:
- num = self.crawl_response(response, item)
- sc.crawl_counter(num)
- next_task_interval = 10
- except (ZbYTbCrawlError, Exception) as e:
- if getattr(e, 'code', None) is None:
- err = ZbYTbCrawlError(unknown_err=e)
- sc.err_record(err)
- elif e.code == 10105:
- # 抛出异常时,将es查询统计结果进行更新
- self._update_crawl_task(item["_id"], count=item['count'])
- else:
- sc.err_record(e)
- self._update_crawl_task(item["_id"], crawl_status='error')
- sc.crawl_counter(0)
- next_task_interval = 0.1
- finally:
- self._release_task(item)
- sc.wait_for_next_task(next_task_interval)
- def start(self):
- while True:
- with Scheduler(site='中国招标与采购网', crawl_type='detail') as scheduler:
- if scheduler.crawl_start:
- self.crawl_spider(scheduler)
- scheduler.finished(10)
|