# -*- coding: utf-8 -*- """ Created on 2024-06-17 --------- @summary: 元博网 详情页采集 --------- @author: Lzz """ import random import re import time import requests.exceptions from lxml.html import fromstring, HtmlElement, tostring from lxml.html.clean import Cleaner from pymongo import MongoClient import setting import utils.tools as tool from dbs.RedisDB import RedisFilter from log import logger from utils.check_utils import CheckText, CheckTask from utils.clean_html import cleaner from utils.login import User, load_login_cookies, login, login_check _proxies = setting.PROXIES def iter_node(element: HtmlElement): yield element for sub_element in element: if isinstance(sub_element, HtmlElement): yield from iter_node(sub_element) def pre_parse(element: HtmlElement): """对 HTML 进行预处理可能会破坏 HTML 原有的结构,导致根据原始 HTML 编写的 XPath 不可用""" pre_remove = { 'log_col2', 'log_col1', 'cz', 'iconfont closei', 'p2 p1', 'cnxh_b', 'msg_error', 'r_gg TB-focus', 'april', 'cont2', 'to_login', 'regtxt', 'shouchang an_n sc', 'april_title red', 'cn_lt', 'dayin an_n', 'dl_zc vip_t free_member', 'rmbq', 'login-form cl', 'dian_g fr', 'di_n', 'd_fx', 'd_tub', 'd_dy', 'anniu1', 'cnxh_list', 'btns cl', 'active', 'close', 'd_an fr', 'avatar', 'toolbar', 'deng_l', 'cen_right fr', 'log_col5', 'agreement', 'log_col3', 'shouchang_af an_n sc_after', 'fast_box', 'di_nr fl', 'xgfj', 'dianh', 'cnxh_list tab_b2 city_list', 'contract cl', 'zb_cen_r fr', 'd_zsms', 'sc_after active', 'dl_k', 'ewm_b', 'fl', 'wypj', 'rukou', 'p1', 'dl_zc', 'success', 'daoh h_30', 'bd', 'april_content', 'print', 'foot', 'cnxh zbgg', 'april_first', 'fastlog', 'tx_mc user_name', 'tab_h2', 'fanding an_n', 'toux', 'log_col4 cl', 'hangy rem_1', 'red', 'regshadow', 'bottom', 'dl_zc vip_t fee_member', 'xszn fl', 'no-print', 'cnxh_b zbgg_b', 'rem rem_1', 'logshadowz', 'd_pj fl', 'tjgjc', 'spdujaiwlohh', 'di_ewm fr', 'dian_h fl', 'tab_h2 zbgg_b_gray', 'fanshou an_n fs', 'login-btn', 'fl gjc', 'agreeshadow', 'guang_db', 'footer_1', 'log_p', 'cnxh_list tab_b2', 'd_sw', 'april_close', 'd_sc', 'erweima no-print', 'qgzx', 'p2', 'sc', 'hd', 'log_col6', 'dh_b', 'dian_guang', 'zhu_c', 'ck cai_k', 'april_box', 'display:none' } for node in iter_node(element): id_attr = node.attrib.get('id') class_attr = node.attrib.get('class') style_attr = node.attrib.get('style') if any([id_attr in pre_remove, class_attr in pre_remove, style_attr in pre_remove]): node.drop_tree() return element def page_source(element: HtmlElement): clear = Cleaner( forms=False, style=True ) return clear.clean_html(tostring(element, encoding="utf-8").decode()) class DetailSpider: def __init__(self): _mgo = MongoClient(setting.MONGO_IP, setting.MONGO_PORT) self.ybw_list = _mgo[setting.MONGO_DB]["ybw_list"] self.ybw_info = _mgo[setting.MONGO_DB]["ybw_info"] self.save_tab = _mgo[setting.MONGO_DB]["data_bak"] self.dedup = RedisFilter() self.user = User(phone=setting.ACCOUNT, passwd=setting.PASSWORD) self.login_times = 0 def json_request(self, fid, request_params): headers = { "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "Referer": "https://www.chinabidding.cn/public/bidagency/index.html", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", } url = "https://www.chinabidding.cn/agency.info.Detail/show" params = { "fid": f"{fid}" } return requests.get(url, headers=headers, params=params, **request_params) def crawl_request(self, item: dict): url = item["competehref"] headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Host': 'www.chinabidding.cn', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', } request_params = {} request_params.setdefault('headers', headers) request_params.setdefault('timeout', 30) request_params.setdefault('proxies', _proxies) # 登录代理 proxy_params = dict(proxies=_proxies, timeout=180) retries = 0 retries_502, max_retries_502 = 0, 3 while retries < 3: if retries_502 > max_retries_502: # 网站已移除该数据 self.ybw_list.update_one({'_id': item["_id"]}, {'$set': {"crawl_status": "remove"}}) break login_cookies = load_login_cookies(self.user.phone) if login_cookies is None: login(*self.user, **proxy_params) self.login_times += 1 self.update_account_login_times() continue if 'cookies' not in request_params: request_params.setdefault('cookies', login_cookies) else: request_params.update({'cookies': login_cookies}) fid = "".join(re.findall('\?fid=(.*)', url)).split('&')[0] if fid: try: request_params.pop('headers', None) r = self.json_request(fid, request_params) # 账号登录状态检查 retry_login = login_check(self.user.phone, url, False, **proxy_params) if retry_login: logger.info(f"[重新登录]{self.user.phone}") _, code = login(*self.user, **proxy_params) self.login_times += 1 retries += 1 if code != 200: time.sleep(600) continue logger.info(f'[采集正文] fid_{fid}') return r except: retries += 1 finally: self.update_account_login_times() else: try: r = requests.get(url, **request_params) # 账号登录状态检查 retry_login = login_check(self.user.phone, url, False, **proxy_params) if retry_login: logger.info(f"[重新登录]{self.user.phone}") _, code = login(*self.user, **proxy_params) self.login_times += 1 retries += 1 if code != 200: time.sleep(1800) continue element = fromstring(r.content.decode()) nodes = element.xpath('//*[@id="main_dom"]/div[1]') if len(nodes) != 1: retries_502 += 1 logger.debug(f'"main_dom"属性匹配个数:{len(nodes)}, {r.status_code} - {url}') else: node = nodes[0] # list index out of range logger.info(f'[采集正文] id={node.attrib.get("id")}') return r except requests.RequestException: retries += 1 finally: self.update_account_login_times() return None def crawl_response(self, response, item): if re.match('^\{', response.text): html = response.json().get('c_info').get('content') else: element: HtmlElement = fromstring(response.text) node = element.xpath('//*[@id="infoDescription"]')[0] node = pre_parse(node) features = { './div[@class="ckgys_cont"]', './/div[@class="detail-title ng-scope"]', './/table[@class="detail_Table"]', } for feature in features: extract_node = node.xpath(feature) if len(extract_node) > 0: valid_node = extract_node[0] break else: valid_node = node html = page_source(valid_node) '''检查原始页面内容''' CheckText(html) item["contenthtml"] = html special = { '若附件无法下载,你可以尝试使用360极速浏览器进行下载!': '', # 'DD000E;|EE000F;|FF000E;': '', '[(]?[)]?[A-Z]{2}000[A-Z]{1};[(]?[\d{1,4}]*[;]?[)]?[;]?': '', } item["detail"] = cleaner(html, special) item["comeintime"] = tool.int2long(int(time.time())) '''检查清洗之后的详情''' CheckText(item["detail"]) insert = {} for key, val in item.items(): if key not in ['crawl_status', 'crawl', 'count', '_id']: insert[key] = val self.save_tab.insert_one(insert) logger.info('[采集成功]{}-{}'.format(item['title'], item['publishtime'])) def update_account_login_times(self): self.ybw_info.update_one( {"account": self.user.phone}, {"$set": { "login_times": self.login_times, "update_time": tool.get_current_date() }} ) def crawl_spider(self, account, item): _id = item["_id"] err = "unknown error" try: CheckTask(item) # 检查请求采集任务 response = self.crawl_request(item) if response is not None: self.crawl_response(response, item) self.ybw_list.update_one({"_id": _id}, {"$set": {"crawl_status": "finished"}}) self.ybw_info.update_one( {"account": self.user.phone}, {"$set": { "count": account["count"] + 1, "update_time": tool.get_current_date(), }} ) return True except Exception as e: err = e logger.error(f"请求错误:{err}") self.ybw_list.update_one({'_id': _id}, {'$set': {'crawl_status': f'{err}'}}) return False def start(self): logger.info(" *** start ***") query = {"crawl_status": {"$exists": False}, "es_count": 0} sort = [('publishtime', -1)] limit = 100 with self.ybw_list.find(query, sort=sort).limit(limit) as cursor: tasks = [doc for doc in cursor] download_count = 0 for item in tasks: # 检查账号 account = self.ybw_info.find_one({"account": self.user.phone}) if account is None: logger.error(f"数据库无此账号信息|{self.user.phone}") return # 登录次数检查 self.login_times = account["login_times"] if self.login_times >= 3: logger.warning(f"账号限制|{self.user.phone}") return # 数据发布时间延迟采集 publish_ts = tool.date_to_timestamp(item["publishtime"]) if publish_ts > int(time.time()) - 43200: logger.warning("未到采集时间") continue fp = "detail_" + item.get("competehref") if not self.dedup.get(fp): self.dedup.add(fp) download_count += 1 rst = self.crawl_spider(account, item) if not rst: self.dedup.delete(fp) if download_count >= account["total"]: logger.warning("当日采集数量已达上限") break time.sleep(random.randint(80, 180)) logger.info(" *** end ***") if __name__ == '__main__': DetailSpider().start()