list_spider.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. import math
  2. import random
  3. import time
  4. from collections import namedtuple
  5. from urllib.parse import quote
  6. import requests
  7. from lxml.html import fromstring, HtmlElement
  8. from config.load import crawler_url, region
  9. from crawler.crawl_scheduler import Scheduler
  10. from crawler.login import login, load_login_cookies, login_session_check
  11. from utils.RedisDB import RedisFilter
  12. from utils.databases import mongo_table, int2long, es_query
  13. from utils.execptions import CrawlError, YbwCrawlError
  14. from utils.log import logger
  15. from utils.tools import clean_title
  16. CrawlMenu = namedtuple('CrawlMenu', ['channel', 'spidercode', 'table_type'])
  17. class ListSpider:
  18. def __init__(self, db: str, crawl_tab: str, **kwargs):
  19. self.crawl_menus = [
  20. # CrawlMenu('企业采购', 'a_ybwcgyzbw_qycg', '7%2C'),
  21. CrawlMenu('政府采购', 'a_ybwcgyzbw_zfcg', '6%2C'),
  22. CrawlMenu('招标预告', 'a_ybwcgyzbw_zbyg', '5%2C'),
  23. CrawlMenu('中标公示', 'a_ybwcgyzbw_zbgs', '4%2C'),
  24. CrawlMenu('服务招标', 'a_ybwcgyzbw_fwzb', '3%2C'),
  25. CrawlMenu('货物招标', 'a_ybwcgyzbw_hwzb', '2%2C'),
  26. CrawlMenu('工程招标', 'a_ybwcgyzbw_gczb', '1%2C'),
  27. ]
  28. self.max_page = kwargs.pop('max_page', 1)
  29. self.page_size = kwargs.pop('page_size', 30)
  30. self.crawl_tab = mongo_table(db, crawl_tab)
  31. self.history_user = None
  32. self.user = None
  33. self.session = None
  34. self.dedup = RedisFilter() # 默认过期时间1年
  35. def crawl_request(self, url: str, refer: str, **kwargs):
  36. menu = kwargs.pop('menu')
  37. region_ = kwargs.pop('region')
  38. page = kwargs.pop('page')
  39. headers = {
  40. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36',
  41. 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
  42. 'Referer': refer
  43. }
  44. request_params = {}
  45. request_params.setdefault('headers', headers)
  46. request_params.setdefault('timeout', 60)
  47. if kwargs.get('cookies') is not None:
  48. request_params.setdefault('cookies', kwargs.pop('cookies', None))
  49. retries = 0
  50. non_empty_retries = 0
  51. proxy, proxies = None, None
  52. while retries < 5:
  53. try:
  54. response = self.session.get(url, **request_params)
  55. except requests.exceptions.Timeout:
  56. time.sleep(10)
  57. retries += 1
  58. continue
  59. else:
  60. element = fromstring(response.text)
  61. pwd_feature = '//*[@type="password"]|//*[@id="password"]'
  62. popup_feature = '//*[@id="renew_pop"]'
  63. login_feature = "{}|{}".format(pwd_feature, popup_feature)
  64. pages_feature = '//*[@id="pages"]'
  65. tr_feature = f'{pages_feature}/following-sibling::table//tr'
  66. page_size = len(element.xpath(tr_feature))
  67. if element.xpath(login_feature):
  68. '''当出现登录或者注册弹窗页面,此时添加账号身份信息并检查账号状态'''
  69. retry_login = login_session_check(self.session, self.user.phone)
  70. if retry_login:
  71. logger.info(f"[重新登录]{self.user.phone}")
  72. self.session, code = login(*self.user, proxies=proxies)
  73. if code != 200:
  74. '''1小时内登录频繁会限制ip,此时添加代理登录账号'''
  75. # if proxy is None:
  76. # proxy = Proxy(True)
  77. # else:
  78. # proxy.switch()
  79. # proxies = proxy.proxies
  80. time.sleep(1800)
  81. retries += 1
  82. login_cookies = load_login_cookies(self.user.phone)
  83. request_params.update({'cookies': login_cookies})
  84. elif element.xpath(pages_feature) and page_size > 0:
  85. logger.info(f'[采集成功]{menu.channel}-{region_}-第{page}页-{page_size}条')
  86. return response
  87. else:
  88. '''没有搜索到任何内容的页面'''
  89. if non_empty_retries > 3:
  90. # 可能访问过于频繁导致,网站不提供查询结果,非空页面重试3次
  91. logger.info(f'[采集成功]{menu.channel}-{region_}-第{page}页-{page_size}条')
  92. return None
  93. n = random.randint(30, 10000)
  94. interval = math.log(n, 1.5)
  95. time.sleep(interval)
  96. non_empty_retries += 1
  97. continue
  98. raise CrawlError(code=10020, reason='列表页访问失败')
  99. def crawl_response(self, response, menu: CrawlMenu, **kwargs):
  100. element: HtmlElement = fromstring(response.text)
  101. results = []
  102. feature = '//tr[@id="resultPrompt"]/parent::*/tr[not(@id)]'
  103. for node in element.xpath(feature):
  104. publish_time = "".join(node.xpath('./td[last()]/text()')).strip()
  105. if '-' not in publish_time:
  106. publish_time = "".join(node.xpath('./td[6]/text()')).strip()
  107. area = "".join("".join(node.xpath('./td[5]/text()')).split())
  108. title = clean_title("".join("".join(node.xpath('./td[2]/a/text()')).split()))
  109. competehref = 'https://www.chinabidding.cn{}'.format("".join(node.xpath('./td[2]/a/@href')))
  110. item = {
  111. "site": "元博网(采购与招标网)",
  112. "channel": menu.channel,
  113. "area": area if area != '跨省' else '全国',
  114. "_d": "comeintime",
  115. "comeintime": int2long(int(time.time())),
  116. "T": "bidding",
  117. "sendflag": "false",
  118. "spidercode": menu.spidercode,
  119. "city": "",
  120. "infoformat": 1,
  121. "type": "",
  122. "publishdept": "",
  123. "title": title,
  124. "competehref": competehref,
  125. "href": "#",
  126. "publishtime": publish_time,
  127. "l_np_publishtime": int2long(int(time.mktime(time.strptime(publish_time, "%Y-%m-%d")))),
  128. }
  129. if title is None:
  130. logger.error(f"[标题为空]{competehref}")
  131. continue
  132. results.append(item)
  133. insert_items = []
  134. for item in results:
  135. if not self.dedup.get(item['competehref']):
  136. item['count'] = es_query(item["title"], item["l_np_publishtime"])
  137. item['crawl'] = False
  138. # print(f'>>> {title} - {competehref}')
  139. insert_items.append(item)
  140. self.dedup.add(item['competehref'])
  141. if len(insert_items) > 0:
  142. self.crawl_tab.insert_many(insert_items)
  143. page, _region = kwargs.pop('page'), kwargs.pop('region')
  144. logger.info(f'[上传成功]{menu.channel}-{_region}-第{page}页-{len(insert_items)}条')
  145. return len(results)
  146. def crawl_spider(self, sc: Scheduler, menu: CrawlMenu):
  147. self.session = requests.session()
  148. for rid, region_ in region.items():
  149. previous_url = None
  150. crawl_total = 1
  151. cookies = None
  152. crawl_pages = list(range(1, self.max_page + 1))
  153. while len(crawl_pages) > 0:
  154. page = crawl_pages.pop(0)
  155. if page == 1:
  156. url = crawler_url['home_page'].format(
  157. rid,
  158. sc.yesterday,
  159. sc.yesterday,
  160. self.page_size,
  161. menu.table_type
  162. )
  163. previous_url = url
  164. refer = crawler_url['refer'].format(quote(region_))
  165. else:
  166. url = crawler_url['list_url'].format(
  167. rid,
  168. sc.yesterday,
  169. sc.yesterday,
  170. page,
  171. self.page_size,
  172. menu.table_type
  173. )
  174. refer = previous_url
  175. previous_url = url
  176. # print(">>> ", url)
  177. sc.crawl_url = url
  178. sc.spider_code = menu.spidercode
  179. if crawl_total % 50 == 0:
  180. '''防止单个账号访问过量,导致账号被封,'''
  181. sc.change_account()
  182. self.session.close()
  183. self.session = requests.session()
  184. self.user = sc.user
  185. if crawl_total >= 4:
  186. '''列表数据从第4页开始,需要登录状态下才能获取列表页数据'''
  187. cookies = load_login_cookies(self.user.phone)
  188. '''数据采集'''
  189. try:
  190. response = self.crawl_request(
  191. url,
  192. refer,
  193. cookies=cookies,
  194. menu=menu,
  195. region=region_,
  196. page=page
  197. )
  198. if response is None:
  199. break
  200. item_size = self.crawl_response(
  201. response,
  202. menu,
  203. region=region_,
  204. page=page
  205. )
  206. sc.crawl_counter(item_size)
  207. if item_size < self.page_size:
  208. '''当前采集数量小于页面查询数量,不需要访问下一页'''
  209. break
  210. else:
  211. crawl_total += 1
  212. except (YbwCrawlError, Exception) as e:
  213. sc.crawl_counter(0)
  214. logger.error('[采集失败]{}-{}-第{}页, 错误类型:{}'.format(
  215. menu.channel,
  216. region_,
  217. page,
  218. e.__class__.__name__,
  219. ))
  220. sc.err_record(e)
  221. finally:
  222. sc.wait_for_next_task(random.choice(range(2, 6)))
  223. self.session.close()
  224. def start(self):
  225. for menu in self.crawl_menus:
  226. with Scheduler(site='元博网', crawl_type='list', channel=menu.channel) as scheduler:
  227. if scheduler.crawl_start:
  228. self.user = scheduler.user
  229. self.crawl_spider(scheduler, menu)
  230. scheduler.finished()
  231. if __name__ == '__main__':
  232. ListSpider(
  233. db='py_spider',
  234. crawl_tab='ybw_list',
  235. page_size=100,
  236. max_page=133,
  237. ).start()