123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- # -*- coding: utf-8 -*-
- """
- Created on 2021-12-13 13:25:15
- ---------
- @summary:
- ---------
- @author: 马国鹏
- """
- import json
- import sys
- sys.path.append('/app/spiders/sword_feapder/FworkSpider')
- import time
- from urllib.parse import urljoin
- import feapder
- from feapder.utils.tools import wechat_warning
- import execjs
- from items.spider_item import DataBakItem, MgpListItem
- from feapder.db.mongodb import MongoDB
- from untils.attachment import AttachmentDownloader
- class Details(feapder.Spider):
- _to_db = None
- db_name = 'mgp_list'
- send_list = []
- # 定义mongo链接
- @property
- def to_db(self):
- if not self._to_db:
- self._to_db = MongoDB()
- return self._to_db
- def start_requests(self):
- while True:
- data_lsit = self.to_db.find(self.db_name,{"parser_name":"details"},sort={"item.publishtime":-1},limit=50)
- for item in data_lsit:
- print(11111)
- request_params = item.get("request_params")
- if item.get("js"):
- eval(item.get("js"))
- if item.get("ex_python"):
- exec(item.get("ex_python"))
- if item.get("proxies"):
- yield feapder.Request(url=item.get("parse_url"),item=item.get("item"),files=item.get("files"),
- deal_detail=item.get("deal_detail"),
- callback=eval(item.get("parse")),base_info=item,**request_params)
- else:
- yield feapder.Request(url=item.get("parse_url"), item=item.get("item"), files=item.get("files"),
- deal_detail=item.get("deal_detail"),
- callback=eval(item.get("parse")), base_info=item,proxies=False,**request_params)
- self.to_db.delete(self.db_name,item)
- break
- def detail_get(self,request,response):
- items = request.item
- list_item = DataBakItem()
- for key in items:
- list_item.__setitem__(key,items[key])
- html = ''
- for xpath in request.deal_detail:
- html = response.xpath(xpath).extract_first() # 标书详细内容
- if html is not None:
- break
- list_item.contenthtml = html
- if request.files:
- files_info = request.files
- files = response.xpath(files_info.get("list_xpath"))
- if request.files_info:
- files_info = request.files_info
- files = response.xpath(files_info.get("list_xpath"))
- if request.files_info:
- files_info = request.files_info
- files = response.xpath(files_info.get("list_xpath"))
- if len(files) > 0:
- attachments = {}
- for index, info in enumerate(files):
- file_url = info.xpath(files_info.get("url_xpath")).extract_first()
- file_name = info.xpath(files_info.get("name_xpath")).extract_first()
- if files_info.get("host"):
- file_url = urljoin(files_info.get("host"), file_url)
- if not files_info.get("file_type"):
- file_type = file_url.split("?")[0].split(".")[-1].lower()
- else:
- file_type = files_info.get("file_type")
- if file_type in files_info.get("files_type") and files_info.get("url_key") in file_url:
- attachment = AttachmentDownloader().fetch_attachment(
- file_name=file_name, file_type=file_type, download_url=file_url,
- enable_proxy=False)
- attachments[len(attachments) + 1] = attachment
- if len(attachments) == 0:
- pass
- else:
- list_item.projectinfo = {"attachment": attachments}
- yield list_item
- def detail_json(self,request,response):
- items = request.item
- list_item = DataBakItem()
- for key in items:
- list_item.__setitem__(key,items[key])
- exec(request.deal_detail)
- yield list_item
- def detail_post(self,request,response):
- items = request.item
- list_item = DataBakItem()
- for key in items:
- list_item.__setitem__(key,items[key])
- exec(request.deal_detail)
- yield list_item
- def failed_request(self, request, response):
- '''请求、解析次数超过上限后,将原信息重新保存至mongo,并修改failed字段'''
- if response is None:
- code = 0
- else:
- code = response.status_code
- err_dic = {"200":"analysis","400":"download","500":"servers","300":"download"}
- if 200<=code<300:
- err = 'analysis'
- elif 300<=code<400:
- err = 'download'
- elif 400<=code<500:
- err = 'download'
- elif 500<=code:
- err = "servers"
- else:
- err = "timeout"
- mgp = MgpListItem()
- mgp.code=code
- mgp.error=err
- items = request.base_info
- for key in items:
- mgp.__setitem__(key,items[key])
- mgp.failed +=1
- if mgp.pri is None:
- mgp.pri = 0
- if mgp.pri > 5:
- if mgp.failed in(10,30,50,100,200)or mgp.failed>200:
- if self.send_list.count(mgp.item.get("site")) == mgp.pri - 5:
- '''
- 根据爬虫优先级报警'''
- info= f'''`
- 您的爬虫出现超<font color="#FF0000">{mgp.failed}</font>次请求、解析失败的任务。
- > **爬虫名称:** {mgp.item.get("site")}
- > **栏目名称:** {mgp.item.get("channel")}
- > **爬虫代码:** {mgp.item.get("spidercode")}
- > **爬虫等级:** {mgp.pri}
- > **所属管理人员:** {mgp.author}
- 请登录剑鱼爬虫管理平台查看详情。
- `'''
- wechat_warning(info)
- self.send_list.append(mgp.item.get("site"))
- yield mgp
- def end_callback(self):
- print("爬虫结束")
- if __name__ == "__main__":
- Details(redis_key="magp:details1").start()
|