import random import time import re import json from collections import namedtuple import requests from lxml.html import fromstring, HtmlElement from config.load import region from crawler.crawl_scheduler import Scheduler from utils.databases import mongo_table, int2long,redis_client, es_query from utils.log import logger from utils.tools import sha1, check_crawl_title,get_proxy from utils.execptions import JyBasicException,CustomCheckError from login import get_cookies CrawlMenu = namedtuple('CrawlMenu', ['channel', 'spidercode', 'table_type']) class ListSpider: def __init__(self, db: str, crawl_tab: str, crawl_max_page=None, enable_proxy=False, allow_show_exception=False): self.crawl_menus = [ CrawlMenu('招标预告', 'a_bdzbw_zbyg', 'retrieval_list.do?single=true&ChannelIds=102'), CrawlMenu('招标公告', 'a_bdzbw_zbgg', 'retrieval_list.do?single=true&ChannelIds=52'), CrawlMenu('公告变更', 'a_bdzbw_ggbg', 'retrieval_list.do?single=true&ChannelIds=51'), CrawlMenu('招标答疑', 'a_bdzbw_zbdy', 'retrieval_list.do?single=true&ChannelIds=103'), CrawlMenu('资审结果', 'a_bdzbw_zsjg', 'retrieval_list.do?single=true&ChannelIds=105'), CrawlMenu('招标文件', 'a_bdzbw_zbwj', 'retrieval_list.do?single=true&ChannelIds=104'), CrawlMenu('中标公告', 'a_bdzbw_zhbgg', 'retrieval_list.do?single=true&ChannelIds=101'), CrawlMenu('采购意向', 'a_bdzbw_cgyx', 'retrieval_list.do?single=true&ChannelIds=114'), CrawlMenu('审批项目', 'a_bdzbw_spxm', 'spxm_list.do'), CrawlMenu('拍卖出让', 'a_bdzbw_pmcr', 'retrieval_list.do?single=true&ChannelIds=115'), CrawlMenu('土地矿产', 'a_bdzbw_tdkc', 'retrieval_list.do?single=true&ChannelIds=116'), CrawlMenu('产权交易', 'a_bdzbw_cqjy', 'retrieval_list.do?single=true&ChannelIds=117'), ] self.total = 0 self.crawl_max_page = crawl_max_page or 1 self.crawl_tab = mongo_table(db, crawl_tab) self.r = redis_client() self.session = requests.session() self.proxy = get_proxy() self.redis_key = 'bdzbw_2024' self.allow_show_exception = allow_show_exception self.cookies = None def read_cookies(self): with open('./login_cookie.json','r') as f: cookies = f.read() return json.loads(cookies) def crawl_request(self, url: str, data): headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "Accept-Language": "zh-CN,zh;q=0.9", "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "application/x-www-form-urlencoded", "Origin": "http://www.bidizhaobiao.com", "Pragma": "no-cache", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36" } request_params = {} request_params.setdefault('headers', headers) request_params.setdefault('timeout', 120) retries = 0 while retries < 2: try: self.cookies = self.read_cookies() response = self.session.post(url, data=data, cookies=self.cookies, proxies=self.proxy, **request_params) except: self.proxy.switch() retries += 1 time.sleep(20) continue if response.status_code == 403: self.proxy = get_proxy() get_cookies(self.session,self.proxy.proxies) retries += 1 elif response.status_code == 200: element = fromstring(response.text) time.sleep(2) if element.xpath('//*[@id="searchResultList"]') or element.xpath('//*[@id="ulList"]'): return response else: '''没有搜索到列表页''' return None else: self.proxy = get_proxy() retries += 1 return None def crawl_response(self, response, menu: CrawlMenu, pro_area): results = [] last_page = [] increase = [] element: HtmlElement = fromstring(response.text) feature = '//div[@id="searchResultList"]/div[2]/div|//div[@id="ulList"]/table[@class="tableList"]/tbody/tr' for node in element.xpath(feature): try: if node.xpath('./div[1]/div[1]/p/a[1]'): competehref = "".join(node.xpath('./div[1]/div[1]/p/a[1]/@href')).strip() title = "".join("".join(node.xpath('./div[1]/div[1]/p/a[1]//text()')).split()) area = "".join("".join(node.xpath('./div[1]/div[2]/div[2]/a/span/text()')).split()) publish_time = "".join("".join(node.xpath('./div[1]/div[2]/div[4]/p/text()')).split()) else: href_info = "".join(node.xpath('./td[@class="projectName"]/a/@onclick')).strip() href_params = "".join(re.findall("spxmInfo\('(.*?)'", href_info, re.S)).strip() competehref = f"http://www.bidizhaobiao.com/spxm-{href_params}.html" title = "".join(node.xpath('./td[@class="projectName"]/a/text()')).strip() area = "".join(node.xpath('./td[@class="address"]/span/text()')).strip() publish_time = "".join(node.xpath('./td[@class="time"]/span/text()')).strip() except: '''某条数据格式异常''' continue item = { "site": "比地招标网", "channel": menu.channel, "area": pro_area, "_d": "comeintime", "comeintime": int2long(int(time.time())), "T": "bidding", "sendflag": "false", "spidercode": menu.spidercode, "city": area, "type": "", "publishdept": "", "title": title, "competehref": competehref, "href": "#", "publishtime": publish_time, "l_np_publishtime": int2long(int(time.mktime(time.strptime(publish_time, "%Y-%m-%d")))), } if title is None or publish_time is None: raise CustomCheckError(code=10107, reason='发布标题或时间为空') present_time = time.strftime("%Y-%m-%d 00:00:00", time.localtime(int(round(time.time())))) timeArray = time.strptime(present_time, "%Y-%m-%d %H:%M:%S") start_date = round(time.mktime(timeArray)) - 86400 if item.get('l_np_publishtime') >= start_date: last_page.append(item) # logger.debug(item) item['crawl'] = False sign = sha1(item['competehref']) if not self.r.hexists(self.redis_key, sign): increase.append(item) if check_crawl_title(title): # item["count"] = 0 item["count"] = es_query(item["title"], item["l_np_publishtime"]) results.append(item) self.r.hset(self.redis_key, sign, '') if len(results) > 0: self.crawl_tab.insert_many(results) return len(results),len(last_page),len(increase) def crawl_spider(self, sc: Scheduler, menu: CrawlMenu): for region_id, region_name in region.items(): page_size = 22 for page in range(1, self.crawl_max_page + 1): url = f'http://www.bidizhaobiao.com/advsearch/{menu.table_type}' data = { "pageNum": f"{page}", "province_id": f"{region_id}", "provinceCityJson": '{'+f'{region_name}'+":[]}", "searchCondition.dtype": "50", "searchCondition.SearchType": "any", "searchCondition.infosources": "", "searchCondition.regionId": "", "provinceState": f"{region_name}", "searchCondition.Pattern": "30", "searchCondition.isOr": "false", "isSelectDtype": "0", "isSelectPattern": "0", } sc.crawl_url = url sc.spider_code = menu.spidercode try: response = self.crawl_request(url,data) if response is None: logger.info(f'[没有搜索到列表页]{menu.channel}-{region_name}-第{page}页-0条') break item_size = self.crawl_response(response, menu, region_name) self.total += item_size[0] logger.info(f'[采集成功]{menu.channel}-{region_name}-第{page}页-{item_size[0]}条') # 第一次采集 if item_size[1] < page_size: '''当前页面之后已无信息''' break # 增量采集 if item_size[2] == 0: '''当前及之后页面信息已采集''' break except (JyBasicException, Exception) as e: logger.error('[采集失败]{}-{}-第{}页, 错误类型:{}'.format( menu.channel, region_name, page, e.__class__.__name__, )) finally: sc.wait_for_next_task(random.choice(range(2, 6))) logger.debug(f'[{menu.channel}]-[采集地区]-{region_name}-已采集{self.total}条数据') self.session.close() def start(self): with Scheduler(site='比地招标网', crawl_type='list') as scheduler: for menu in self.crawl_menus: if scheduler.crawl_start: self.crawl_spider(scheduler, menu) scheduler.finished(5) logger.info(f'本次共采集{self.total}条数据') if __name__ == '__main__': ListSpider( db='py_spider', crawl_tab='bdzbw_list', crawl_max_page=1, ).start()