details.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2021-12-13 13:25:15
  4. ---------
  5. @summary:
  6. ---------
  7. @author: 马国鹏
  8. """
  9. import json
  10. import sys
  11. sys.path.append('/app/spiders/sword_feapder/FworkSpider')
  12. import time
  13. from urllib.parse import urljoin
  14. import feapder
  15. from feapder.utils.tools import wechat_warning
  16. import execjs
  17. from items.spider_item import DataBakItem, MgpListItem
  18. from feapder.db.mongodb import MongoDB
  19. from untils.attachment import AttachmentDownloader
  20. class Details(feapder.Spider):
  21. _to_db = None
  22. db_name = 'mgp_list'
  23. send_list = []
  24. # 定义mongo链接
  25. @property
  26. def to_db(self):
  27. if not self._to_db:
  28. self._to_db = MongoDB()
  29. return self._to_db
  30. def start_requests(self):
  31. while True:
  32. data_lsit = self.to_db.find(self.db_name,{"parser_name":"details"},sort={"item.publishtime":-1},limit=50)
  33. for item in data_lsit:
  34. print(11111)
  35. request_params = item.get("request_params")
  36. if item.get("js"):
  37. eval(item.get("js"))
  38. if item.get("ex_python"):
  39. exec(item.get("ex_python"))
  40. if item.get("proxies"):
  41. yield feapder.Request(url=item.get("parse_url"),item=item.get("item"),files=item.get("files"),
  42. deal_detail=item.get("deal_detail"),
  43. callback=eval(item.get("parse")),base_info=item,**request_params)
  44. else:
  45. yield feapder.Request(url=item.get("parse_url"), item=item.get("item"), files=item.get("files"),
  46. deal_detail=item.get("deal_detail"),
  47. callback=eval(item.get("parse")), base_info=item,proxies=False,**request_params)
  48. self.to_db.delete(self.db_name,item)
  49. break
  50. def detail_get(self,request,response):
  51. items = request.item
  52. list_item = DataBakItem()
  53. for key in items:
  54. list_item.__setitem__(key,items[key])
  55. html = ''
  56. for xpath in request.deal_detail:
  57. html = response.xpath(xpath).extract_first() # 标书详细内容
  58. if html is not None:
  59. break
  60. list_item.contenthtml = html
  61. if request.files:
  62. files_info = request.files
  63. files = response.xpath(files_info.get("list_xpath"))
  64. if request.files_info:
  65. files_info = request.files_info
  66. files = response.xpath(files_info.get("list_xpath"))
  67. if request.files_info:
  68. files_info = request.files_info
  69. files = response.xpath(files_info.get("list_xpath"))
  70. if len(files) > 0:
  71. attachments = {}
  72. for index, info in enumerate(files):
  73. file_url = info.xpath(files_info.get("url_xpath")).extract_first()
  74. file_name = info.xpath(files_info.get("name_xpath")).extract_first()
  75. if files_info.get("host"):
  76. file_url = urljoin(files_info.get("host"), file_url)
  77. if not files_info.get("file_type"):
  78. file_type = file_url.split("?")[0].split(".")[-1].lower()
  79. else:
  80. file_type = files_info.get("file_type")
  81. if file_type in files_info.get("files_type") and files_info.get("url_key") in file_url:
  82. attachment = AttachmentDownloader().fetch_attachment(
  83. file_name=file_name, file_type=file_type, download_url=file_url,
  84. enable_proxy=False)
  85. attachments[len(attachments) + 1] = attachment
  86. if len(attachments) == 0:
  87. pass
  88. else:
  89. list_item.projectinfo = {"attachment": attachments}
  90. yield list_item
  91. def detail_json(self,request,response):
  92. items = request.item
  93. list_item = DataBakItem()
  94. for key in items:
  95. list_item.__setitem__(key,items[key])
  96. exec(request.deal_detail)
  97. yield list_item
  98. def detail_post(self,request,response):
  99. items = request.item
  100. list_item = DataBakItem()
  101. for key in items:
  102. list_item.__setitem__(key,items[key])
  103. exec(request.deal_detail)
  104. yield list_item
  105. def failed_request(self, request, response):
  106. '''请求、解析次数超过上限后,将原信息重新保存至mongo,并修改failed字段'''
  107. if response is None:
  108. code = 0
  109. else:
  110. code = response.status_code
  111. err_dic = {"200":"analysis","400":"download","500":"servers","300":"download"}
  112. if 200<=code<300:
  113. err = 'analysis'
  114. elif 300<=code<400:
  115. err = 'download'
  116. elif 400<=code<500:
  117. err = 'download'
  118. elif 500<=code:
  119. err = "servers"
  120. else:
  121. err = "timeout"
  122. mgp = MgpListItem()
  123. mgp.code=code
  124. mgp.error=err
  125. items = request.base_info
  126. for key in items:
  127. mgp.__setitem__(key,items[key])
  128. mgp.failed +=1
  129. if mgp.pri is None:
  130. mgp.pri = 0
  131. if mgp.pri > 5:
  132. if mgp.failed in(10,30,50,100,200)or mgp.failed>200:
  133. if self.send_list.count(mgp.item.get("site")) == mgp.pri - 5:
  134. '''
  135. 根据爬虫优先级报警'''
  136. info= f'''`
  137. 您的爬虫出现超<font color="#FF0000">{mgp.failed}</font>次请求、解析失败的任务。
  138. > **爬虫名称:** {mgp.item.get("site")}
  139. > **栏目名称:** {mgp.item.get("channel")}
  140. > **爬虫代码:** {mgp.item.get("spidercode")}
  141. > **爬虫等级:** {mgp.pri}
  142. > **所属管理人员:** {mgp.author}
  143. 请登录剑鱼爬虫管理平台查看详情。
  144. `'''
  145. wechat_warning(info)
  146. self.send_list.append(mgp.item.get("site"))
  147. yield mgp
  148. def end_callback(self):
  149. print("爬虫结束")
  150. if __name__ == "__main__":
  151. Details(redis_key="magp:details1").start()