DetailPageSpider.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. import random
  2. import re
  3. import time
  4. from urllib.parse import urlencode, urlparse
  5. from bs4 import BeautifulSoup
  6. from parsel import Selector
  7. from crawler.check_utils import CheckText, CheckTask
  8. from crawler.clean_html import cleaner, clean_js
  9. from crawler.crawl_scheduler import Scheduler
  10. from crawler.defaults import http_request_get
  11. from crawler.login import load_login_cookies, login, User, login_status_check
  12. from utils.attachment import AttachmentDownloader
  13. from utils.clean_file import (
  14. extract_file_type,
  15. extract_file_name_by_href
  16. )
  17. from utils.databases import mongo_table, int2long
  18. from utils.execptions import (
  19. AccountError,
  20. AttachmentError,
  21. CheckError,
  22. ZbYTbCrawlError
  23. )
  24. from utils.log import logger
  25. class CrawlDetailPageSpider:
  26. def __init__(self, db: str, crawl_tab: str, save_tab: str, error_tab: str):
  27. self.crawl_tab = mongo_table(db, crawl_tab)
  28. self.save_tab = mongo_table(db, save_tab)
  29. self.crawl_error_tab = mongo_table(db, error_tab)
  30. self.attachment_downloader = AttachmentDownloader()
  31. self.senior_account = 'runhekeji'
  32. self.account = None
  33. self.cookies = None
  34. self.spider_code = None
  35. @staticmethod
  36. def prepare_url(rows: dict):
  37. host = "https://www.zbytb.com/api/task.js.php"
  38. params = {
  39. "moduleid": rows["type_code"],
  40. "html": "show",
  41. "itemid": re.findall(r"\d+\.?\d*", rows["competehref"])[0][:-1],
  42. "page": "1",
  43. "es": "",
  44. "refresh": "{}.js".format(random.random())
  45. }
  46. url = host + '?' + urlencode(params)
  47. return url
  48. def _update_crawl_task(self, tid, **kwargs):
  49. self.crawl_tab.update_one({'_id': tid}, {'$set': kwargs})
  50. def _lock_task(self, task: dict):
  51. update = {'crawl': True}
  52. self._update_crawl_task(task['_id'], **update)
  53. def _release_task(self, task: dict):
  54. update = {'crawl': False}
  55. self._update_crawl_task(task['_id'], **update)
  56. def switch_senior_user(self, tid):
  57. """
  58. 切换高级账号
  59. :param tid: 采集条目ObjectId
  60. """
  61. # 需要高级会员才能查询的招标信息,指定使用高级账号
  62. self._update_crawl_task(tid, account=self.senior_account)
  63. def crawl_error(
  64. self,
  65. *,
  66. spider_code: str,
  67. account: str,
  68. err_msg='采集失败',
  69. response=None,
  70. rows=None
  71. ):
  72. items = {
  73. 'account': account,
  74. 'spidercode': spider_code,
  75. 'crawl_time': int2long(int(time.time())),
  76. 'crawl_type': 'detail'
  77. }
  78. if response is not None:
  79. items.update({
  80. 'url': response.request.url,
  81. 'status_code': response.status_code,
  82. 'reason': response.reason,
  83. 'params': getattr(response.request, 'params', None),
  84. })
  85. elif rows is not None:
  86. items.update({
  87. 'url': rows['url'],
  88. 'status_code': rows['status_code'],
  89. 'reason': rows['reason'],
  90. 'params': rows['params'],
  91. })
  92. self.crawl_error_tab.insert_one(items)
  93. logger.error(err_msg)
  94. def download_attachment(self, content: str, rows: dict):
  95. soup = BeautifulSoup(content, "lxml")
  96. attachments = {}
  97. nums = 0
  98. nodes = soup.findAll("a") or soup.findAll("iframe")
  99. for node in nodes:
  100. file_name, file_type = (node.string or node.text), None
  101. file_path = node.attrs.get("href", "") or node.attrs.get("src", "")
  102. # 附件可能包含在一个iframe中
  103. _id = node.attrs.get('id')
  104. if _id == 'pdfContainer':
  105. file_type = 'pdf'
  106. # 抽取文件类型
  107. if file_type is None:
  108. file_type = (extract_file_type(file_name) or extract_file_type(file_path))
  109. # 抽取文件名称
  110. try:
  111. parser = urlparse(file_path)
  112. if parser.scheme in ['https', 'http'] and file_type is not None:
  113. if not file_name:
  114. name = extract_file_name_by_href(file_path, file_type)
  115. if name is not None:
  116. file_name = name
  117. else:
  118. file_name = f"{rows['title']}_{nums}"
  119. attachment = self.attachment_downloader.download(
  120. file_name=file_name,
  121. file_type=file_type,
  122. download_url=file_path,
  123. )
  124. if len(attachment) > 0:
  125. attachments[str(len(attachments) + 1)] = attachment
  126. nums += 1
  127. except ValueError:
  128. pass
  129. file_url = soup.findAll('pdfpath')
  130. if file_url:
  131. file_url = list(file_url[0].stripped_strings)[0]
  132. file_type = extract_file_type(file_url)
  133. file_name = rows['title']
  134. if file_type:
  135. attachment = self.attachment_downloader.download(
  136. file_name=file_name,
  137. file_type=file_type,
  138. download_url=file_url,
  139. )
  140. if len(attachment) > 0:
  141. attachments[str(len(attachments) + 1)] = attachment
  142. if len(attachments) > 0:
  143. rows["projectinfo"] = {"attachments": attachments}
  144. def save_data(self, content, rows: dict):
  145. rows["contenthtml"] = clean_js(content)
  146. special = {
  147. '<iframe[^<>]*>[\s\S]*?</iframe>': ''
  148. }
  149. rows["detail"] = cleaner(content, special=special)
  150. try:
  151. CheckText(rows["detail"])
  152. except CheckError:
  153. # 页面是一个pdf阅读器, eg: https://www.zbytb.com/s-zhongbiao-10119392.html
  154. rows["detail"] = "<br/>详细内容请访问原网页!"
  155. rows["comeintime"] = int2long(int(time.time()))
  156. insert = {}
  157. for key, val in rows.items():
  158. if key not in ['crawl_status', 'account', 'crawl', 'count', '_id']:
  159. insert[key] = val
  160. self.save_tab.insert_one(insert)
  161. logger.info("[采集成功]{}-{}".format(rows['title'], rows['publishtime']))
  162. def crawl_response(self, response, rows: dict):
  163. source = re.findall(r'Inner(.*?);Inner', response.text)
  164. if len(source) > 0:
  165. content = source[0][13:-1]
  166. else:
  167. root = Selector(text=response.text)
  168. content = root.xpath('//div[@class="conent-box"]').extract_first()
  169. if content:
  170. clean_features = [
  171. '//div[@class="conent-box"]/div[@class="xgxm"]',
  172. '//div[@class="content-user"]'
  173. ]
  174. for feature in clean_features:
  175. clean_html = root.xpath(feature).extract_first()
  176. if clean_html is not None:
  177. content = content.replace(clean_html, '')
  178. else:
  179. content = ''
  180. counter = 0
  181. try:
  182. CheckText(content)
  183. self.download_attachment(content, rows)
  184. self.save_data(content, rows)
  185. self._update_crawl_task(rows['_id'], crawl_status='finished')
  186. counter = 1
  187. except (AttachmentError, CheckError) as e:
  188. if e.code == 10104 and self.account != self.senior_account:
  189. self.switch_senior_user(rows)
  190. else:
  191. self._update_crawl_task(rows['_id'], crawl_status='response_err')
  192. response.status_code = e.code
  193. err_msg = response.reason = e.reason
  194. response.request.url = rows['competehref']
  195. self.crawl_error(
  196. response=response,
  197. spider_code=self.spider_code,
  198. account=self.account,
  199. err_msg=err_msg
  200. )
  201. except AccountError:
  202. self.switch_senior_user(rows)
  203. return counter
  204. def crawl_request(self, url: str, referer: str, user: User):
  205. headers = {
  206. 'Host': 'www.zbytb.com',
  207. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
  208. 'Accept': '*/*',
  209. 'Referer': 'https://www.zbytb.com/s-zb-20147673.html',
  210. 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
  211. }
  212. headers.update({'Referer': referer})
  213. retries, max_retries = 0, 3
  214. while True:
  215. success, response = http_request_get(
  216. url,
  217. login=True,
  218. headers=headers,
  219. cookies=self.cookies,
  220. verify=False,
  221. )
  222. if not success and response.status_code == 10000 and retries < max_retries:
  223. retries += 1
  224. else:
  225. retry_login = login_status_check(response)
  226. if retry_login:
  227. logger.info(f"重新登录:{user.username}")
  228. self.cookies = login(*user)
  229. else:
  230. break
  231. if success:
  232. return response
  233. # 记录采集异常的数据
  234. self.crawl_error(
  235. spider_code=self.spider_code,
  236. account=self.account,
  237. response=response
  238. )
  239. return None
  240. def crawl_spider(self, sc: Scheduler):
  241. while True:
  242. next_task_interval = None
  243. item = sc.crawl_task
  244. if len(item) == 0:
  245. return False
  246. logger.info(f">>> {item['title']} - {item['competehref']}")
  247. self._lock_task(item)
  248. sc.spider_code = self.spider_code = item['spidercode']
  249. sc.crawl_url = item['competehref']
  250. # 分配账号和账号cookie
  251. self.account = item.get('account', sc.user.username)
  252. self.cookies = load_login_cookies(self.account)
  253. user = sc.query_user(self.account)
  254. if user is None:
  255. return False
  256. try:
  257. CheckTask(item)
  258. url = self.prepare_url(item)
  259. referer = item['competehref']
  260. response = self.crawl_request(url, referer, user)
  261. if response is not None:
  262. num = self.crawl_response(response, item)
  263. sc.crawl_counter(num)
  264. next_task_interval = 10
  265. except (ZbYTbCrawlError, Exception) as e:
  266. if getattr(e, 'code', None) is None:
  267. err = ZbYTbCrawlError(unknown_err=e)
  268. sc.err_record(err)
  269. elif e.code == 10105:
  270. # 抛出异常时,将es查询统计结果进行更新
  271. self._update_crawl_task(item["_id"], count=item['count'])
  272. else:
  273. sc.err_record(e)
  274. self._update_crawl_task(item["_id"], crawl_status='error')
  275. sc.crawl_counter(0)
  276. next_task_interval = 0.1
  277. finally:
  278. self._release_task(item)
  279. sc.wait_for_next_task(next_task_interval)
  280. def start(self):
  281. while True:
  282. with Scheduler(site='中国招标与采购网', crawl_type='detail') as scheduler:
  283. if scheduler.crawl_start:
  284. self.crawl_spider(scheduler)
  285. scheduler.finished(10)