import time import requests.exceptions from lxml.html import fromstring, HtmlElement, tostring from lxml.html.clean import Cleaner from pymongo.errors import DuplicateKeyError from crawler.check_utils import CheckText, CheckTask from crawler.clean_html import clean_html from crawler.crawl_record import update_records, load_records from crawler.crawl_scheduler import Scheduler from crawler.login import login, load_login_cookies, login_check from utils.databases import MongoDBS from utils.execptions import VoidCrawlError, JyBasicException from utils.log import logger from utils.socks5 import Proxy from utils.tools import int2long def iter_node(element: HtmlElement): yield element for sub_element in element: if isinstance(sub_element, HtmlElement): yield from iter_node(sub_element) def pre_parse(element: HtmlElement): """对 HTML 进行预处理可能会破坏 HTML 原有的结构,导致根据原始 HTML 编写的 XPath 不可用""" pre_remove = { 'log_col2', 'log_col1', 'cz', 'iconfont closei', 'p2 p1', 'cnxh_b', 'msg_error', 'r_gg TB-focus', 'april', 'cont2', 'to_login', 'regtxt', 'shouchang an_n sc', 'april_title red', 'cn_lt', 'dayin an_n', 'dl_zc vip_t free_member', 'rmbq', 'login-form cl', 'dian_g fr', 'di_n', 'd_fx', 'd_tub', 'd_dy', 'anniu1', 'cnxh_list', 'btns cl', 'active', 'close', 'd_an fr', 'avatar', 'toolbar', 'deng_l', 'cen_right fr', 'log_col5', 'agreement', 'log_col3', 'shouchang_af an_n sc_after', 'fast_box', 'di_nr fl', 'xgfj', 'dianh', 'cnxh_list tab_b2 city_list', 'contract cl', 'zb_cen_r fr', 'd_zsms', 'sc_after active', 'dl_k', 'ewm_b', 'fl', 'wypj', 'rukou', 'p1', 'dl_zc', 'success', 'daoh h_30', 'bd', 'april_content', 'print', 'foot', 'cnxh zbgg', 'april_first', 'fastlog', 'tx_mc user_name', 'tab_h2', 'fanding an_n', 'toux', 'log_col4 cl', 'hangy rem_1', 'red', 'regshadow', 'bottom', 'dl_zc vip_t fee_member', 'xszn fl', 'no-print', 'cnxh_b zbgg_b', 'rem rem_1', 'logshadowz', 'd_pj fl', 'tjgjc', 'spdujaiwlohh', 'di_ewm fr', 'dian_h fl', 'tab_h2 zbgg_b_gray', 'fanshou an_n fs', 'login-btn', 'fl gjc', 'agreeshadow', 'guang_db', 'footer_1', 'log_p', 'cnxh_list tab_b2', 'd_sw', 'april_close', 'd_sc', 'erweima no-print', 'qgzx', 'p2', 'sc', 'hd', 'log_col6', 'dh_b', 'dian_guang', 'zhu_c', 'ck cai_k', 'april_box', 'display:none' } for node in iter_node(element): id_attr = node.attrib.get('id') class_attr = node.attrib.get('class') style_attr = node.attrib.get('style') if any([id_attr in pre_remove, class_attr in pre_remove, style_attr in pre_remove]): node.drop_tree() return element def page_source(element: HtmlElement): clear = Cleaner( forms=False, style=True ) return clear.clean_html(tostring(element, encoding="utf-8").decode()) class DetailSpider: def __init__( self, db: str, crawl_tab: str, save_tab: str, crawl_total=None, ): self.crawl_tab = MongoDBS(db, crawl_tab).coll self.save_tab = MongoDBS(db, save_tab).coll self.crawl_total = crawl_total or 6000 self.user = None def crawl_request(self, url): headers = { 'Host': 'www.chinabidding.cn', 'sec-ch-ua': '" Not;A Brand";v="99", "Google Chrome";v="97", "Chromium";v="97"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-User': '?1', 'Sec-Fetch-Dest': 'document', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', } request_params = {} request_params.setdefault('headers', headers) request_params.setdefault('timeout', 60) retries = 0 proxy, proxies = None, None while retries < 3: login_cookies = load_login_cookies(self.user.phone) if login_cookies is None: login(*self.user) continue elif 'cookies' not in request_params: request_params.setdefault('cookies', login_cookies) else: request_params.update({'cookies': login_cookies}) try: r = requests.get(url, **request_params) '''账号登录状态检查''' retry_login = login_check(self.user.phone, url, False) if retry_login: logger.info(f"[重新登录]{self.user.phone}") _, code = login(*self.user, proxies=proxies) if code == 200: retries += 1 else: if proxy is None: proxy = Proxy(True) else: proxy.switch() proxies = proxy.proxies retries += 1 continue element = fromstring(r.text) nodes = element.xpath('//*[@id="main_dom"]/div[1]') if len(nodes) != 1: raise VoidCrawlError else: node = nodes[0] logger.info(f'[采集正文] id={node.attrib.get("id")}') return r except requests.RequestException: retries += 1 continue return None def crawl_response(self, response, item): element: HtmlElement = fromstring(response.text) node = element.xpath('//*[@id="infoDescription"]')[0] node = pre_parse(node) features = { './div[@class="ckgys_cont"]', './/div[@class="detail-title ng-scope"]', './/table[@class="detail_Table"]', } for feature in features: extract_node = node.xpath(feature) if len(extract_node) > 0: valid_node = extract_node[0] break else: valid_node = node html = page_source(valid_node) '''检查文本内容''' CheckText(html) item["contenthtml"] = html item["detail"] = clean_html(html) item["comeintime"] = int2long(int(time.time())) del item['count'], item['crawl'] if 'crawl_status' in item: del item['crawl_status'] try: self.save_tab.insert_one(item) except DuplicateKeyError: pass logger.info('[采集成功]{}-{}'.format(item['title'], item['publishtime'])) def set_crawl_status(self, item: dict, status: bool): self.crawl_tab.update_one( {'_id': item['_id']}, {'$set': {'crawl': status}} ) def crawl_spider(self, sc: Scheduler): while True: if load_records(self.user.phone, sc.today) >= self.crawl_total: return True item = sc.crawl_task if len(item) == 0: return False self.set_crawl_status(item, True) '''使用调度器记录采集内容,出现错误时错误写入数据库''' sc.spider_code = item['spidercode'] sc.crawl_url = item['competehref'] try: '''检查请求采集任务''' CheckTask(item) url = item['competehref'] response = self.crawl_request(url) if response is not None: self.crawl_response(response, item) self.crawl_tab.update_one( {"_id": item["_id"]}, {'$set': {'crawl_status': 'finished'}} ) update_records(self.user.phone, 1) except JyBasicException as e: if e.code == 10105: '''检查出该异常时,程序会将es查询结果更新采集表''' self.crawl_tab.update_one( {"_id": item["_id"]}, {'$set': {'count': item['count']}} ) else: sc.err_record(e) self.crawl_tab.update_one( {"_id": item["_id"]}, {'$set': {'crawl_status': 'error'}} ) finally: self.set_crawl_status(item, False) sc.wait_for_next_task() def start(self): query = {'used': False, 'site': '元博网', 'class': 'detail'} while True: with Scheduler(query) as scheduler: scheduler.crawl_type = 'detail' if scheduler.crawl_start: self.user = scheduler.user finished = self.crawl_spider(scheduler) if finished: '''完成采集任务''' scheduler.finished() else: '''暂无采集任务''' scheduler.wait_for_next_task() if __name__ == '__main__': DetailSpider( db='py_spider', crawl_tab='ybw_list', save_tab='data_bak', crawl_total=6000, ).start()