123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- import math
- import random
- import time
- from collections import namedtuple
- from urllib.parse import quote
- import requests
- from lxml.html import fromstring, HtmlElement
- from config.load import crawler_url, region
- from crawler.crawl_scheduler import Scheduler
- from crawler.login import login, load_login_cookies, login_session_check
- from utils.RedisDB import RedisFilter
- from utils.databases import mongo_table, int2long, es_query
- from utils.execptions import CrawlError, YbwCrawlError
- from utils.log import logger
- from utils.tools import clean_title
- CrawlMenu = namedtuple('CrawlMenu', ['channel', 'spidercode', 'table_type'])
- class ListSpider:
- def __init__(self, db: str, crawl_tab: str, **kwargs):
- self.crawl_menus = [
- # CrawlMenu('企业采购', 'a_ybwcgyzbw_qycg', '7%2C'),
- CrawlMenu('政府采购', 'a_ybwcgyzbw_zfcg', '6%2C'),
- CrawlMenu('招标预告', 'a_ybwcgyzbw_zbyg', '5%2C'),
- CrawlMenu('中标公示', 'a_ybwcgyzbw_zbgs', '4%2C'),
- CrawlMenu('服务招标', 'a_ybwcgyzbw_fwzb', '3%2C'),
- CrawlMenu('货物招标', 'a_ybwcgyzbw_hwzb', '2%2C'),
- CrawlMenu('工程招标', 'a_ybwcgyzbw_gczb', '1%2C'),
- ]
- self.max_page = kwargs.pop('max_page', 1)
- self.page_size = kwargs.pop('page_size', 30)
- self.crawl_tab = mongo_table(db, crawl_tab)
- self.history_user = None
- self.user = None
- self.session = None
- self.dedup = RedisFilter() # 默认过期时间1年
- def crawl_request(self, url: str, refer: str, **kwargs):
- menu = kwargs.pop('menu')
- region_ = kwargs.pop('region')
- page = kwargs.pop('page')
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36',
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
- 'Referer': refer
- }
- request_params = {}
- request_params.setdefault('headers', headers)
- request_params.setdefault('timeout', 60)
- if kwargs.get('cookies') is not None:
- request_params.setdefault('cookies', kwargs.pop('cookies', None))
- retries = 0
- non_empty_retries = 0
- proxy, proxies = None, None
- while retries < 5:
- try:
- response = self.session.get(url, **request_params)
- except requests.exceptions.Timeout:
- time.sleep(10)
- retries += 1
- continue
- else:
- element = fromstring(response.text)
- pwd_feature = '//*[@type="password"]|//*[@id="password"]'
- popup_feature = '//*[@id="renew_pop"]'
- login_feature = "{}|{}".format(pwd_feature, popup_feature)
- pages_feature = '//*[@id="pages"]'
- tr_feature = f'{pages_feature}/following-sibling::table//tr'
- page_size = len(element.xpath(tr_feature))
- if element.xpath(login_feature):
- '''当出现登录或者注册弹窗页面,此时添加账号身份信息并检查账号状态'''
- retry_login = login_session_check(self.session, self.user.phone)
- if retry_login:
- logger.info(f"[重新登录]{self.user.phone}")
- self.session, code = login(*self.user, proxies=proxies)
- if code != 200:
- '''1小时内登录频繁会限制ip,此时添加代理登录账号'''
- # if proxy is None:
- # proxy = Proxy(True)
- # else:
- # proxy.switch()
- # proxies = proxy.proxies
- time.sleep(1800)
- retries += 1
- login_cookies = load_login_cookies(self.user.phone)
- request_params.update({'cookies': login_cookies})
- elif element.xpath(pages_feature) and page_size > 0:
- logger.info(f'[采集成功]{menu.channel}-{region_}-第{page}页-{page_size}条')
- return response
- else:
- '''没有搜索到任何内容的页面'''
- if non_empty_retries > 3:
- # 可能访问过于频繁导致,网站不提供查询结果,非空页面重试3次
- logger.info(f'[采集成功]{menu.channel}-{region_}-第{page}页-{page_size}条')
- return None
- n = random.randint(30, 10000)
- interval = math.log(n, 1.5)
- time.sleep(interval)
- non_empty_retries += 1
- continue
- raise CrawlError(code=10020, reason='列表页访问失败')
- def crawl_response(self, response, menu: CrawlMenu, **kwargs):
- element: HtmlElement = fromstring(response.text)
- results = []
- feature = '//tr[@id="resultPrompt"]/parent::*/tr[not(@id)]'
- for node in element.xpath(feature):
- publish_time = "".join(node.xpath('./td[last()]/text()')).strip()
- if '-' not in publish_time:
- publish_time = "".join(node.xpath('./td[6]/text()')).strip()
- area = "".join("".join(node.xpath('./td[5]/text()')).split())
- title = clean_title("".join("".join(node.xpath('./td[2]/a/text()')).split()))
- competehref = 'https://www.chinabidding.cn{}'.format("".join(node.xpath('./td[2]/a/@href')))
- item = {
- "site": "元博网(采购与招标网)",
- "channel": menu.channel,
- "area": area if area != '跨省' else '全国',
- "_d": "comeintime",
- "comeintime": int2long(int(time.time())),
- "T": "bidding",
- "sendflag": "false",
- "spidercode": menu.spidercode,
- "city": "",
- "infoformat": 1,
- "type": "",
- "publishdept": "",
- "title": title,
- "competehref": competehref,
- "href": "#",
- "publishtime": publish_time,
- "l_np_publishtime": int2long(int(time.mktime(time.strptime(publish_time, "%Y-%m-%d")))),
- }
- if title is None:
- logger.error(f"[标题为空]{competehref}")
- continue
- results.append(item)
- insert_items = []
- for item in results:
- if not self.dedup.get(item['competehref']):
- item['count'] = es_query(item["title"], item["l_np_publishtime"])
- item['crawl'] = False
- # print(f'>>> {title} - {competehref}')
- insert_items.append(item)
- self.dedup.add(item['competehref'])
- if len(insert_items) > 0:
- self.crawl_tab.insert_many(insert_items)
- page, _region = kwargs.pop('page'), kwargs.pop('region')
- logger.info(f'[上传成功]{menu.channel}-{_region}-第{page}页-{len(insert_items)}条')
- return len(results)
- def crawl_spider(self, sc: Scheduler, menu: CrawlMenu):
- self.session = requests.session()
- for rid, region_ in region.items():
- previous_url = None
- crawl_total = 1
- cookies = None
- crawl_pages = list(range(1, self.max_page + 1))
- while len(crawl_pages) > 0:
- page = crawl_pages.pop(0)
- if page == 1:
- url = crawler_url['home_page'].format(
- rid,
- sc.yesterday,
- sc.yesterday,
- self.page_size,
- menu.table_type
- )
- previous_url = url
- refer = crawler_url['refer'].format(quote(region_))
- else:
- url = crawler_url['list_url'].format(
- rid,
- sc.yesterday,
- sc.yesterday,
- page,
- self.page_size,
- menu.table_type
- )
- refer = previous_url
- previous_url = url
- # print(">>> ", url)
- sc.crawl_url = url
- sc.spider_code = menu.spidercode
- if crawl_total % 50 == 0:
- '''防止单个账号访问过量,导致账号被封,'''
- sc.change_account()
- self.session.close()
- self.session = requests.session()
- self.user = sc.user
- if crawl_total >= 4:
- '''列表数据从第4页开始,需要登录状态下才能获取列表页数据'''
- cookies = load_login_cookies(self.user.phone)
- '''数据采集'''
- try:
- response = self.crawl_request(
- url,
- refer,
- cookies=cookies,
- menu=menu,
- region=region_,
- page=page
- )
- if response is None:
- break
- item_size = self.crawl_response(
- response,
- menu,
- region=region_,
- page=page
- )
- sc.crawl_counter(item_size)
- if item_size < self.page_size:
- '''当前采集数量小于页面查询数量,不需要访问下一页'''
- break
- else:
- crawl_total += 1
- except (YbwCrawlError, Exception) as e:
- sc.crawl_counter(0)
- logger.error('[采集失败]{}-{}-第{}页, 错误类型:{}'.format(
- menu.channel,
- region_,
- page,
- e.__class__.__name__,
- ))
- sc.err_record(e)
- finally:
- sc.wait_for_next_task(random.choice(range(2, 6)))
- self.session.close()
- def start(self):
- for menu in self.crawl_menus:
- with Scheduler(site='元博网', crawl_type='list', channel=menu.channel) as scheduler:
- if scheduler.crawl_start:
- self.user = scheduler.user
- self.crawl_spider(scheduler, menu)
- scheduler.finished()
- if __name__ == '__main__':
- ListSpider(
- db='py_spider',
- crawl_tab='ybw_list',
- page_size=100,
- max_page=133,
- ).start()
|