123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- # -*- coding: utf-8 -*-
- """
- Created on 2021-12-13 13:25:15
- ---------
- @summary: 生成一定有效期cookie,并使用的detail 详情处理方案,默认不限制ip
- ---------
- @author: 马国鹏
- """
- import sys
- sys.path.append('/app/spiders/sword_feapder/FworkSpider')
- import feapder
- from feapder.utils.tools import wechat_warning
- import execjs
- from items.spider_item import DataBakItem, MgpListItem
- from feapder.db.mongodb import MongoDB
- from untils.cookie_pool import PageCookiePool
- import copy
- class Details(feapder.Spider):
- _to_db = None
- db_name = 'mgp_list'
- send_list = []
- # 定义mongo链接
- @property
- def to_db(self):
- if not self._to_db:
- self._to_db = MongoDB()
- return self._to_db
- def start_requests(self):
- while True:
- data_lsit = self.to_db.find(self.db_name,{"parser_name":"details_cookie"},sort={"date":-1})
- for item in data_lsit:
- request_params = item.get("request_params")
- if item.get("ex_python"):
- exec(item.get("ex_python"))
- yield feapder.Request(url=item.get("parse_url"),item=item.get("item"),
- deal_detail=item.get("deal_detail"),**request_params,
- callback=eval(item.get("parse")),base_info=item,down_mid=item.get("down_mid"))
- self.to_db.delete(self.db_name,item)
- break
- def detail_get(self,request,response):
- '''处理html格式的返回结果'''
- if request.down_mid.get("text") and request.down_mid.get("text") in response.text:
- '''失败处理,当text设置不为None,且在resposne.text中时,删除当前cookie并重新生产cookie'''
- down_mid = copy.copy(request.down_mid)
- key = down_mid.get("key")
- page_url = down_mid.get("page_url")
- cookie_pool = PageCookiePool(redis_key=key, page_url=page_url, selenium=False)
- cookie_pool.del_cookie(request.cookies)
- yield request
- if response.code in (request.down_mid.get("code")):
- '''失败处理,response——code不为正确的状态码时,删除当前cookie并重新生产cookie'''
- down_mid = copy.copy(request.down_mid)
- key = down_mid.get("key")
- page_url = down_mid.get("page_url")
- cookie_pool = PageCookiePool(redis_key=key, page_url=page_url, selenium=False)
- cookie_pool.del_cookie(request.cookies)
- yield request
- items = request.item
- list_item = DataBakItem()
- for key in items:
- list_item.__setitem__(key,items[key])
- html = ''
- for xpath in request.deal_detail:
- html = response.xpath(xpath).extract_first() # 标书详细内容
- if html is not None:
- break
- list_item.contenthtml = html
- yield list_item
- def detail_json(self,request,response):
- '''处理json串及其他格式的返回结果'''
- if request.down_mid.get("text") and request.down_mid.get("text") in response.text:
- '''失败处理,当text设置不为None,且在resposne.text中时,删除当前cookie并重新生产cookie'''
- down_mid = copy.copy(request.down_mid)
- key = down_mid.get("key")
- page_url = down_mid.get("page_url")
- cookie_pool = PageCookiePool(redis_key=key, page_url=page_url, selenium=False)
- cookie_pool.del_cookie(request.cookies)
- yield request
- if response.code in (request.down_mid.get("code")):
- '''失败处理,response——code不为正确的状态码时,删除当前cookie并重新生产cookie'''
- down_mid = copy.copy(request.down_mid)
- key = down_mid.get("key")
- page_url = down_mid.get("page_url")
- cookie_pool = PageCookiePool(redis_key=key, page_url=page_url, selenium=False)
- cookie_pool.del_cookie(request.cookies)
- yield request
- items = request.item
- list_item = DataBakItem()
- for key in items:
- list_item.__setitem__(key,items[key])
- html = ''
- exec(request.deal_detail)
- list_item.contenthtml = html
- yield list_item
- def failed_request(self, request, response):
- '''请求、解析次数超过上限后,将原信息重新保存至mongo,并修改failed字段'''
- if response is None:
- code = 0
- else:
- code = response.status_code
- err_dic = {"200": "analysis", "400": "download", "500": "servers", "300": "download"}
- if 200 <= code < 300:
- err = 'analysis'
- elif 300 <= code < 400:
- err = 'download'
- elif 400 <= code < 500:
- err = 'download'
- elif 500 <= code:
- err = "servers"
- else:
- err = "timeout"
- mgp = MgpListItem()
- mgp.code = code
- mgp.error = err
- items = request.base_info
- for key in items:
- mgp.__setitem__(key, items[key])
- mgp.failed += 1
- if mgp.pri is None:
- mgp.pri = 0
- if mgp.pri > 5:
- if mgp.failed in (10, 30, 50, 100, 200) or mgp.failed > 200:
- if self.send_list.count(mgp.item.get("site")) == mgp.pri - 5:
- '''
- 根据爬虫优先级报警'''
- info = f'''`
- 您的爬虫出现超<font color="#FF0000">{mgp.failed}</font>次请求、解析失败的任务。
- > **爬虫名称:** {mgp.item.get("site")}
- > **栏目名称:** {mgp.item.get("channel")}
- > **爬虫代码:** {mgp.item.get("spidercode")}
- > **爬虫等级:** {mgp.pri}
- > **所属管理人员:** {mgp.author}
- 请登录剑鱼爬虫管理平台查看详情。
- `'''
- wechat_warning(info)
- self.send_list.append(mgp.item.get("site"))
- yield mgp
- def end_callback(self):
- print("爬虫结束")
- def download_midware(self, request):
- down_mid = request.down_mid
- key = down_mid.get("key")
- page_url = down_mid.get("page_url")
- cookie_pool = PageCookiePool(redis_key=key, page_url=page_url, selenium=False)
- request.cookies = cookie_pool.get_cookie()
- return request
- if __name__ == "__main__":
- Details(redis_key="magp:details1").start()
|