import random import time import re import requests.exceptions from lxml.html import fromstring, HtmlElement, tostring from lxml.html.clean import Cleaner from crawler.check_utils import CheckText, CheckTask from crawler.clean_html import cleaner from crawler.crawl_scheduler import Scheduler from crawler.login import login, load_login_cookies, login_check from utils.databases import mongo_table, int2long from utils.execptions import YbwCrawlError from utils.log import logger def iter_node(element: HtmlElement): yield element for sub_element in element: if isinstance(sub_element, HtmlElement): yield from iter_node(sub_element) def pre_parse(element: HtmlElement): """对 HTML 进行预处理可能会破坏 HTML 原有的结构,导致根据原始 HTML 编写的 XPath 不可用""" pre_remove = { 'log_col2', 'log_col1', 'cz', 'iconfont closei', 'p2 p1', 'cnxh_b', 'msg_error', 'r_gg TB-focus', 'april', 'cont2', 'to_login', 'regtxt', 'shouchang an_n sc', 'april_title red', 'cn_lt', 'dayin an_n', 'dl_zc vip_t free_member', 'rmbq', 'login-form cl', 'dian_g fr', 'di_n', 'd_fx', 'd_tub', 'd_dy', 'anniu1', 'cnxh_list', 'btns cl', 'active', 'close', 'd_an fr', 'avatar', 'toolbar', 'deng_l', 'cen_right fr', 'log_col5', 'agreement', 'log_col3', 'shouchang_af an_n sc_after', 'fast_box', 'di_nr fl', 'xgfj', 'dianh', 'cnxh_list tab_b2 city_list', 'contract cl', 'zb_cen_r fr', 'd_zsms', 'sc_after active', 'dl_k', 'ewm_b', 'fl', 'wypj', 'rukou', 'p1', 'dl_zc', 'success', 'daoh h_30', 'bd', 'april_content', 'print', 'foot', 'cnxh zbgg', 'april_first', 'fastlog', 'tx_mc user_name', 'tab_h2', 'fanding an_n', 'toux', 'log_col4 cl', 'hangy rem_1', 'red', 'regshadow', 'bottom', 'dl_zc vip_t fee_member', 'xszn fl', 'no-print', 'cnxh_b zbgg_b', 'rem rem_1', 'logshadowz', 'd_pj fl', 'tjgjc', 'spdujaiwlohh', 'di_ewm fr', 'dian_h fl', 'tab_h2 zbgg_b_gray', 'fanshou an_n fs', 'login-btn', 'fl gjc', 'agreeshadow', 'guang_db', 'footer_1', 'log_p', 'cnxh_list tab_b2', 'd_sw', 'april_close', 'd_sc', 'erweima no-print', 'qgzx', 'p2', 'sc', 'hd', 'log_col6', 'dh_b', 'dian_guang', 'zhu_c', 'ck cai_k', 'april_box', 'display:none' } for node in iter_node(element): id_attr = node.attrib.get('id') class_attr = node.attrib.get('class') style_attr = node.attrib.get('style') if any([id_attr in pre_remove, class_attr in pre_remove, style_attr in pre_remove]): node.drop_tree() return element def page_source(element: HtmlElement): clear = Cleaner( forms=False, style=True ) return clear.clean_html(tostring(element, encoding="utf-8").decode()) class DetailSpider: def __init__( self, db: str, crawl_tab: str, save_tab: str, ): self.crawl_tab = mongo_table(db, crawl_tab) self.save_tab = mongo_table(db, save_tab) self.save_url = mongo_table("editor", "source_url") self.user = None def _update_crawl_task(self, tid, **kwargs): self.crawl_tab.update_one({'_id': tid}, {'$set': kwargs}) def _lock_task(self, task: dict): update = {'crawl': True} self._update_crawl_task(task['_id'], **update) def _release_task(self, task: dict): update = {'crawl': False} self._update_crawl_task(task['_id'], **update) def json_request(self, fid, request_params): url = "https://www.chinabidding.cn/agency.info.Detail/show" params = { "fid": f"{fid}" } res = requests.get(url, params=params, **request_params) return res def crawl_request(self, item: dict): url = item['competehref'] headers = { 'Host': 'www.chinabidding.cn', 'sec-ch-ua': '" Not;A Brand";v="99", "Google Chrome";v="97", "Chromium";v="97"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-User': '?1', 'Sec-Fetch-Dest': 'document', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', } request_params = {} request_params.setdefault('headers', headers) request_params.setdefault('timeout', 60) retries = 0 retries_502, max_retries_502 = 0, 15 proxy, proxies = None, None while retries < 3: if retries_502 > max_retries_502: # 网站已移除该数据 self._update_crawl_task(item['_id'], crawl_status='remove') break login_cookies = load_login_cookies(self.user.phone) if login_cookies is None: login(*self.user) continue elif 'cookies' not in request_params: request_params.setdefault('cookies', login_cookies) else: request_params.update({'cookies': login_cookies}) fid = "".join(re.findall('\?fid=(.*)',url)) if fid: try: r = self.json_request(fid, request_params) # 账号登录状态检查 retry_login = login_check(self.user.phone, url, False) if retry_login: logger.info(f"[重新登录]{self.user.phone}") _, code = login(*self.user, proxies=proxies) if code == 200: retries += 1 else: time.sleep(1800) retries += 1 continue logger.info(f'[采集正文] fid_{fid}') return r except: retries += 1 continue else: try: r = requests.get(url, **request_params) # 账号登录状态检查 retry_login = login_check(self.user.phone, url, False) if retry_login: logger.info(f"[重新登录]{self.user.phone}") _, code = login(*self.user, proxies=proxies) if code == 200: retries += 1 else: time.sleep(1800) retries += 1 continue element = fromstring(r.text) nodes = element.xpath('//*[@id="main_dom"]/div[1]') if len(nodes) != 1: retries_502 += 1 logger.debug(f'"main_dom"属性匹配个数:{len(nodes)}, {r.status_code} - {url}') continue else: node = nodes[0] logger.info(f'[采集正文] id={node.attrib.get("id")}') return r except requests.RequestException: retries += 1 continue return None def crawl_response(self, response, item): if re.match('^\{', response.text): html = response.json().get('c_info').get('content') else: element: HtmlElement = fromstring(response.text) node = element.xpath('//*[@id="infoDescription"]')[0] node = pre_parse(node) features = { './div[@class="ckgys_cont"]', './/div[@class="detail-title ng-scope"]', './/table[@class="detail_Table"]', } for feature in features: extract_node = node.xpath(feature) if len(extract_node) > 0: valid_node = extract_node[0] break else: valid_node = node html = page_source(valid_node) '''检查原始页面内容''' source_url = re.search('.*点击查看内容', html) if source_url: self.save_url.insert_one({ "site": "元博网", "title": item['title'], "source_url": source_url.group(1), "comeintime": int2long(int(time.time())) }) CheckText(html) item["contenthtml"] = html special = { '若附件无法下载,你可以尝试使用360极速浏览器进行下载!': '', # 'DD000E;|EE000F;|FF000E;': '', '[(]?[)]?[A-Z]{2}000[A-Z]{1};[(]?[\d{1,4}]*[;]?[)]?[;]?': '', } item["detail"] = cleaner(html, special) item["comeintime"] = int2long(int(time.time())) '''检查清洗之后的详情''' source_url = re.search('.*点击查看内容', item["detail"]) if source_url: self.save_url.insert_one({ "site": "元博网", "title": item['title'], "source_url": source_url.group(1), "comeintime": int2long(int(time.time())) }) CheckText(item["detail"]) insert = {} for key, val in item.items(): if key not in ['crawl_status', 'crawl', 'count', '_id']: insert[key] = val self.save_tab.insert_one(insert) logger.info('[采集成功]{}-{}'.format(item['title'], item['publishtime'])) def crawl_spider(self, sc: Scheduler): while True: next_task_interval = None logger.info(f"[count:]{str(sc.count)}") if sc.count >= sc.total: return True item = sc.crawl_task if len(item) == 0: return False self._lock_task(item) # 记录采集异常的爬虫代码与来源 sc.spider_code = item['spidercode'] sc.crawl_url = item['competehref'] try: # 检查请求采集任务 CheckTask(item) response = self.crawl_request(item) if response is not None: self.crawl_response(response, item) self._update_crawl_task(item["_id"], crawl_status='finished') sc.crawl_counter(1) next_task_interval = random.choice(range(3,9)) except (YbwCrawlError, Exception) as e: if getattr(e, 'code', None) is None: err = YbwCrawlError(unknown_err=e) sc.err_record(err) elif e.code == 10105: # 抛出异常时,将es查询统计结果进行更新 self._update_crawl_task(item["_id"], count=item['count']) logger.info('[重复数据]{}-{}'.format(item['title'], item['publishtime'])) else: sc.err_record(e) self._update_crawl_task(item["_id"], crawl_status='error') logger.info('[问题数据]{}-{}'.format(item['title'], item['publishtime'])) sc.crawl_counter(0) next_task_interval = 0.1 finally: self._release_task(item) sc.wait_for_next_task(next_task_interval) def start(self): while True: with Scheduler(site='元博网', crawl_type='detail') as scheduler: if scheduler.crawl_start: self.user = scheduler.user self.crawl_spider(scheduler) scheduler.finished(10) if __name__ == '__main__': DetailSpider( db='py_spider', crawl_tab='ybw_list', save_tab='data_bak', ).start()