import hashlib import random import time from pathlib import Path import pandas as pd import redis import requests from loguru import logger from lxml.html import fromstring, tostring from pymongo import MongoClient from selenium import webdriver from selenium.webdriver import Chrome from selenium.webdriver.common.by import By '''MongoDB''' client = MongoClient('192.168.3.182', 27017) company_tab = client['national']['company'] '''redis服务''' r = redis.Redis( connection_pool=redis.ConnectionPool( host='192.168.3.182', port=6379, password='jianyu@python', db=10 ), decode_responses=True ) redis_key = 'jzsc_2022' '''日志''' log_path = (Path(__file__).absolute().parent / 'logs/log_{time:YYYYMMDD}.log').resolve() logger.add( log_path, format='{time:YYYY-MM-DD HH:mm:ss} - {level} - {message}', level='INFO', rotation='00:00', retention='1 week', encoding='utf-8', ) '''企业资质''' COMPANY_QUALITY_MAPS = { '资质类别': 'quality_type', '资质证书号': 'quality_no', '资质名称': 'quality_name', '发证日期': 'fzrq', '发证有效期': 'fzyxq', '发证机关': 'fzjg', } '''不良行为''' BAD_BEHAVIOR_MAPS = { '诚信记录主体及编号': 'integrity_no', '决定内容': 'decide_content', '实施部门': 'ssbm', '决定日期与有效期': 'execution_date', } '''黑名单记录''' BLACK_LIST_MAPS = { '黑名单记录主体及编号': 'black_list_no', '黑名单认定依据': 'black_list_rdyj', '认定部门': 'rdbm', '决定日期与有效期': 'execution_date', } '''失信联合惩戒记录''' PUNISH_MAPS = { '失信记录编号': 'punish_no', '失信联合惩戒记录主体': 'punish_subject', '法人姓名': 'legal_person', '列入名单事由': 'reason', '认定部门': 'rdbm', '列入日期': 'join_date', } CRAWL_SITE = 'http://jzsc.mohurd.gov.cn/data/company' def sha1(*args): """ 十六进制数字字符串形式摘要值 @param args: 字符串 @return: 摘要值 """ hash_sha1 = hashlib.sha1() for arg in args: hash_sha1.update(arg.encode('utf-8')) return hash_sha1.hexdigest() def get_proxy(scheme=None, default=None, socks5h=False): url = 'http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch' headers = {'Authorization': 'Basic amlhbnl1MDAxOjEyM3F3ZSFB'} try: proxy = requests.get(url, headers=headers, timeout=15).json() except requests.RequestException: return default if not proxy: logger.debug('暂无代理...') return default proxies = proxy.get('data') if proxies: if socks5h: proxy_items = proxies.get('http') proxy_h = { 'http': proxy_items.replace('socks5', 'socks5h'), 'https': proxy_items.replace('socks5', 'socks5h') } proxies = proxy_h return proxies if not scheme else proxies.get(scheme, default) def html2element(html): return fromstring(html) def element2html(lxml_element): return tostring(lxml_element, encoding='utf-8').decode() def display_prompt_popup(html): _element = html2element(html) node = _element.xpath('//div[@class="el-dialog__wrapper"]')[0] _popup_style = node.attrib.get('style') if _popup_style is not None: _styles = str(_popup_style).split(';') res = list(filter(lambda x: len(x) > 0, _styles))[-1].strip().lower() if res == 'display: none': '''无提示弹框''' return False '''有提示弹框''' return True def display_geetest_panel(html): _element = html2element(html) node = _element.xpath('//div[@class="geetest_panel_next"]') if len(node) == 0: '''无验证码''' return False _geetest_panel = node[0] geetest_style = _geetest_panel.attrib.get('style') if geetest_style is not None and geetest_style == 'display: block;': '''有验证码''' return True else: '''无验证码''' return False def prompt_popup(driver: Chrome, wait_time=None): while True: if not display_prompt_popup(driver.page_source): break logger.info(">>> 点击提示弹框") driver.find_element_by_xpath('//div[@class="el-dialog captchaDilaog"]/div[3]/div/button[1]').click() time.sleep(1) '''流程之间的间隔时间''' _wait_time = (wait_time or 1) time.sleep(_wait_time) def geetest_panel(driver: Chrome, wait_time=None, save_img_to_local=False): while True: if not display_geetest_panel(driver.page_source): break logger.info(">>> 验证码检测") text = input("通过验证后,结束等待。请输入:y") if text == 'y': continue _wait_time = (wait_time or 1) time.sleep(wait_time) def check_page(driver: Chrome, wait_time=None, **kwargs): """检查页面""" wait_time = (wait_time or 1) prompt_popup(driver, wait_time=wait_time) geetest_panel( driver, wait_time=wait_time, save_img_to_local=kwargs.get('save_img_to_local'), ) def click(driver: Chrome, button, wait_time=None, allow_check_page=False, run_js=True): if run_js: driver.execute_script("arguments[0].click();", button) else: button.click() wait_time = (wait_time or 1) time.sleep(wait_time) if allow_check_page: check_page(driver, wait_time=wait_time) def click_query(driver: Chrome, wait_time=None): """查询按钮""" button = driver.find_element_by_class_name("ssButton") wait_time = (wait_time or 1) click(driver, button, wait_time=wait_time) def next_page(driver: Chrome): element = html2element(driver.page_source) node = element.xpath('//button[@class="btn-next"]')[0] attrib = node.attrib.get('disabled') if attrib is not None and attrib == 'disabled': '''最大页码''' return False else: '''继续翻页''' button = driver.find_element_by_class_name('btn-next') click(driver, button) return True def current_page(html): element = html2element(html) nodes = element.xpath('//ul[@class="el-pager"]/li') for node in nodes: if node.attrib.get('class') == 'number active': return node.text def extract_content(html): """抽取页面结构化数据""" results = [] '''字段映射表''' _maps = { **COMPANY_QUALITY_MAPS, **BAD_BEHAVIOR_MAPS, **BLACK_LIST_MAPS, **PUNISH_MAPS, } '''转化成dataframe''' dfs = pd.read_html(html) if len(dfs) == 2: columns = list(dfs[0].columns.array) values = dfs[1].values '''合并内容''' panel_container = [dict(zip(columns, val)) for val in values] '''转换字段''' for item in panel_container: _item = {} for key, val in item.items(): if key in _maps: _item[_maps[key]] = val results.append(_item) return results def crawl_spider(driver: Chrome, handler): """采集爬虫""" exception_count = 0 td_elements = driver.find_elements(By.XPATH, value='//table[@class="el-table__body"]//tr/td[3]') for td_element in td_elements: if exception_count > 3: '''数据异常,停止采集''' return False title = td_element.text '''使用公司名称进行去重''' if r.hexists(redis_key, sha1(title)): logger.info(f"[重复数据]{title} - 丢弃") continue button = td_element.find_element_by_class_name("link") click(driver, button, wait_time=random.randint(3, 10), run_js=False) for current_handler in driver.window_handles: if current_handler == handler: continue '''切换到弹出页面''' driver.switch_to.window(current_handler) current_url = driver.current_url '''首次进入详情页,检查提示弹框和验证码面板''' check_page(driver, wait_time=random.randint(2, 6)) '''企业数据处理''' company = {} '''企业基础数据''' element = html2element(driver.page_source) nodes = element.xpath('//div[@class="detaile-header__info--table"]') for node in nodes: credit_no = "".join(node.xpath('./div[1]/div[1]/div[2]/text()')).strip() legal_person = "".join(node.xpath('./div[1]/div[2]/div[2]/text()')).strip() company_type = "".join(node.xpath('./div[2]/div[1]/div[2]/text()')).strip() address = "".join(node.xpath('./div[2]/div[2]/div[2]/text()')).strip() business_address = "".join(node.xpath('./div[3]/div[1]/div[2]/text()')).strip() company = { 'company_name': title, # 企业名称 'credit_no': credit_no, # 统一社会信用代码 'legal_person': legal_person, # 企业法定代表人 'company_type': company_type, # 企业登记注册类型 'address': address, # 企业注册属地 'business_address': business_address, # 企业经营地址 'industry': '', # 所属行业 'register_date': '', # 注册时间 'tel_phone': '', # 联系方式 } # logger.info(item) '''企业资质''' try: element = html2element(driver.page_source) node = element.xpath('//div[@class="panel-container"]')[0] company_quality_html = element2html(node) company_quality = extract_content(company_quality_html) company['company_quality'] = company_quality company['company_quality_html'] = {'html': company_quality_html} except IndexError: pass '''注册人员''' try: company_staff = driver.find_element_by_id("tab-companyStaff") click(driver, company_staff, allow_check_page=True) reg_buttons = driver.find_elements(by=By.XPATH, value='//div[contains(@id, "tab-")]/span') logger.info(f'>>> 人员注册类别 <<<') for element in reg_buttons: # TODO 页面需翻页的逻辑未添加 logger.info(f'[{element.text}]') click(driver, element, wait_time=random.randint(1, 3)) registrar = [] element = html2element(driver.page_source) nodes = element.xpath('//div[@id="pane-companyStaff"]//table[@class="el-table__body"]//tr') for node in nodes: name = "".join(node.xpath('./td[2]//span/text()')).strip() id_no = "".join(node.xpath('./td[3]/div/text()')).strip() reg_type = "".join(node.xpath('./td[4]/div/text()')).strip() reg_no = "".join(node.xpath('./td[5]/div/text()')).strip() reg_major = "".join(node.xpath('./td[6]/div/text()')).strip() registrar.append({ 'name': name, # 姓名 'id_no': id_no, # 身份证号 'reg_type': reg_type, # 注册类别 'reg_no': reg_no, # 注册号(执业印章号) 'reg_major': reg_major, # 注册专业 }) company['company_staff'] = registrar except IndexError: pass '''不良行为''' try: bad_behavior = driver.find_element_by_id('tab-badBehavior') click(driver, bad_behavior, allow_check_page=True) element = html2element(driver.page_source) node = element.xpath('//div[@aria-labelledby="tab-badBehavior"]/div')[0] bad_behavior_html = element2html(node) bad_behaviors = extract_content(bad_behavior_html) company['bad_behavior'] = bad_behaviors company['bad_behavior_html'] = {'html': bad_behavior_html} except IndexError: pass '''黑名单记录''' try: black_list = driver.find_element_by_id('tab-blackList') click(driver, black_list, allow_check_page=True) element = html2element(driver.page_source) node = element.xpath('//div[@id="pane-blackList"]/div')[0] black_list_html = element2html(node) black_list_array = extract_content(black_list_html) company['black_list'] = black_list_array company['black_list_html'] = {'html': black_list_html} except IndexError: pass '''失信联合惩戒记录''' try: punish = driver.find_element_by_id('tab-punishLog') click(driver, punish, allow_check_page=True) element = html2element(driver.page_source) node = element.xpath('//div[@id="pane-punishLog"]/div')[0] punish_html = element2html(node) punish_array = extract_content(punish_html) company['punish'] = punish_array company['punish_html'] = {'html': punish_html} except IndexError: pass '''保存企业数据''' if len(company['credit_no']) > 0: company_tab.insert_one(company) r.hset(redis_key, sha1(title), title) logger.info(f'>>> {title} - {current_url} - 采集成功 - 保存入库') else: exception_count += 1 # 页面无企业数据 logger.info(f'>>> {title} - {current_url} - 采集失败 - 无社会信用代码') '''关闭详情页标签''' driver.close() '''返回列表页''' driver.switch_to.window(handler) '''下一条执行时间''' time.sleep(2) else: return True def downloader(driver: Chrome, handler): while True: logger.info(f">>> 第{current_page(driver.page_source)}页 <<<") allow_crawl = crawl_spider(driver, handler) '''是否继续采集''' if not allow_crawl: logger.info("网站数据异常,终止采集") return False '''翻页''' if not next_page(driver): logger.info('采集结束') break return True def select_province(driver: Chrome, records): """选择注册属地""" '''点击省份下拉框''' drop_down_button = driver.find_element_by_xpath('//div[@class="region-select"]/div[1]/div[1]/span[1]/span[1]/i[contains(@class,"el-select__caret el-input__icon el-icon-arrow-up")]') click(driver, drop_down_button, wait_time=1) '''选择省份''' li_elements = driver.find_elements(by=By.XPATH, value='/html/body/div[@class="el-select-dropdown el-popper"][1]/div[1]/div[1]/ul/li') for element in li_elements: province = element.text if province not in records: logger.info(f'>> 企业注册属地省份:{province} <<') click(driver, element, wait_time=1.5) records.append(province) return False else: return True def select_categories(driver: Chrome, records): span_elements = driver.find_elements(by=By.XPATH, value='//div[@class="labelInPut labelInPutRadio"]/span') for element in span_elements: qualification = element.text if qualification not in records: logger.info(f'>> 企业资质类别:{qualification} <<') records.setdefault(qualification, []) provinces = records.get(qualification) if provinces is not None: if len(provinces) < 32: click(driver, element, wait_time=1.5) crawl_finished = select_province(driver, provinces) if not crawl_finished: click_query(driver, wait_time=2) return False else: return True def start(enable_remote_driver=False): ''' "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" --remote-debugging-port=9222 --no-first-run --no-default-browser-check --user-data-dir="./data" ''' options = webdriver.ChromeOptions() if enable_remote_driver: options.add_experimental_option("debuggerAddress", "127.0.0.1:9222") options.add_argument("--disable-gpu") chrome_driver = webdriver.Chrome( executable_path="/Users/dongzhaorui/Downloads/chromedriver-mac-x64/chromedriver", options=options ) main_handler = chrome_driver.current_window_handle # 获取句柄 '''清除其余窗口''' for handler in chrome_driver.window_handles: if handler != main_handler: chrome_driver.switch_to.window(handler) chrome_driver.close() chrome_driver.switch_to.window(main_handler) chrome_driver.get(CRAWL_SITE) time.sleep(3) '''采集记录''' records = { '全部': None, '造价咨询企业': None, } while True: crawl_finished = select_categories(chrome_driver, records) if crawl_finished: logger.info('任务结束') break '''下载数据''' _continue = downloader(chrome_driver, main_handler) if not _continue: break if not enable_remote_driver: chrome_driver.quit() if __name__ == '__main__': # while True: # try: # start(enable_remote_driver=True) # except: # logger.info("等待100秒") # time.sleep(100) start(enable_remote_driver=True)