# -*- coding: utf-8 -*- """ Created on 2021-12-13 13:25:15 --------- @summary: --------- @author: 马国鹏 """ import json import sys sys.path.append('/app/spiders/sword_feapder/FworkSpider') import time from urllib.parse import urljoin import feapder from feapder.utils.tools import wechat_warning import execjs from items.spider_item import DataBakItem, MgpListItem from feapder.db.mongodb import MongoDB from untils.attachment import AttachmentDownloader class Details(feapder.Spider): _to_db = None db_name = 'mgp_list' send_list = [] # 定义mongo链接 @property def to_db(self): if not self._to_db: self._to_db = MongoDB() return self._to_db def start_requests(self): while True: data_lsit = self.to_db.find(self.db_name,{"parser_name":"details"},sort={"item.publishtime":-1},limit=50) for item in data_lsit: print(11111) request_params = item.get("request_params") if item.get("js"): eval(item.get("js")) if item.get("ex_python"): exec(item.get("ex_python")) if item.get("proxies"): yield feapder.Request(url=item.get("parse_url"),item=item.get("item"),files=item.get("files"), deal_detail=item.get("deal_detail"), callback=eval(item.get("parse")),base_info=item,**request_params) else: yield feapder.Request(url=item.get("parse_url"), item=item.get("item"), files=item.get("files"), deal_detail=item.get("deal_detail"), callback=eval(item.get("parse")), base_info=item,proxies=False,**request_params) self.to_db.delete(self.db_name,item) break def detail_get(self,request,response): items = request.item list_item = DataBakItem() for key in items: list_item.__setitem__(key,items[key]) html = '' for xpath in request.deal_detail: html = response.xpath(xpath).extract_first() # 标书详细内容 if html is not None: break list_item.contenthtml = html if request.files: files_info = request.files files = response.xpath(files_info.get("list_xpath")) if request.files_info: files_info = request.files_info files = response.xpath(files_info.get("list_xpath")) if request.files_info: files_info = request.files_info files = response.xpath(files_info.get("list_xpath")) if len(files) > 0: attachments = {} for index, info in enumerate(files): file_url = info.xpath(files_info.get("url_xpath")).extract_first() file_name = info.xpath(files_info.get("name_xpath")).extract_first() if files_info.get("host"): file_url = urljoin(files_info.get("host"), file_url) if not files_info.get("file_type"): file_type = file_url.split("?")[0].split(".")[-1].lower() else: file_type = files_info.get("file_type") if file_type in files_info.get("files_type") and files_info.get("url_key") in file_url: attachment = AttachmentDownloader().fetch_attachment( file_name=file_name, file_type=file_type, download_url=file_url, enable_proxy=False) attachments[len(attachments) + 1] = attachment if len(attachments) == 0: pass else: list_item.projectinfo = {"attachment": attachments} yield list_item def detail_json(self,request,response): items = request.item list_item = DataBakItem() for key in items: list_item.__setitem__(key,items[key]) exec(request.deal_detail) yield list_item def detail_post(self,request,response): items = request.item list_item = DataBakItem() for key in items: list_item.__setitem__(key,items[key]) exec(request.deal_detail) yield list_item def failed_request(self, request, response): '''请求、解析次数超过上限后,将原信息重新保存至mongo,并修改failed字段''' if response is None: code = 0 else: code = response.status_code err_dic = {"200":"analysis","400":"download","500":"servers","300":"download"} if 200<=code<300: err = 'analysis' elif 300<=code<400: err = 'download' elif 400<=code<500: err = 'download' elif 500<=code: err = "servers" else: err = "timeout" mgp = MgpListItem() mgp.code=code mgp.error=err items = request.base_info for key in items: mgp.__setitem__(key,items[key]) mgp.failed +=1 if mgp.pri is None: mgp.pri = 0 if mgp.pri > 5: if mgp.failed in(10,30,50,100,200)or mgp.failed>200: if self.send_list.count(mgp.item.get("site")) == mgp.pri - 5: ''' 根据爬虫优先级报警''' info= f'''` 您的爬虫出现超{mgp.failed}次请求、解析失败的任务。 > **爬虫名称:** {mgp.item.get("site")} > **栏目名称:** {mgp.item.get("channel")} > **爬虫代码:** {mgp.item.get("spidercode")} > **爬虫等级:** {mgp.pri} > **所属管理人员:** {mgp.author} 请登录剑鱼爬虫管理平台查看详情。 `''' wechat_warning(info) self.send_list.append(mgp.item.get("site")) yield mgp def end_callback(self): print("爬虫结束") if __name__ == "__main__": Details(redis_key="magp:details1").start()