|
- import hashlib
- import random
- import time
- from pathlib import Path
- import pandas as pd
- import redis
- import requests
- from loguru import logger
- from lxml.html import fromstring, tostring
- from pymongo import MongoClient
- from selenium import webdriver
- from selenium.webdriver import Chrome
- from selenium.webdriver.common.by import By
- '''MongoDB'''
- client = MongoClient('192.168.3.182', 27017)
- company_tab = client['national']['company']
- '''redis服务'''
- r = redis.Redis(
- connection_pool=redis.ConnectionPool(
- host='192.168.3.182',
- port=6379,
- password='jianyu@python',
- db=10
- ),
- decode_responses=True
- )
- redis_key = 'jzsc_2022'
- '''日志'''
- log_path = (Path(__file__).absolute().parent / 'logs/log_{time:YYYYMMDD}.log').resolve()
- logger.add(
- log_path,
- format='{time:YYYY-MM-DD HH:mm:ss} - {level} - {message}',
- level='INFO',
- rotation='00:00',
- retention='1 week',
- encoding='utf-8',
- )
- '''企业资质'''
- COMPANY_QUALITY_MAPS = {
- '资质类别': 'quality_type',
- '资质证书号': 'quality_no',
- '资质名称': 'quality_name',
- '发证日期': 'fzrq',
- '发证有效期': 'fzyxq',
- '发证机关': 'fzjg',
- }
- '''不良行为'''
- BAD_BEHAVIOR_MAPS = {
- '诚信记录主体及编号': 'integrity_no',
- '决定内容': 'decide_content',
- '实施部门': 'ssbm',
- '决定日期与有效期': 'execution_date',
- }
- '''黑名单记录'''
- BLACK_LIST_MAPS = {
- '黑名单记录主体及编号': 'black_list_no',
- '黑名单认定依据': 'black_list_rdyj',
- '认定部门': 'rdbm',
- '决定日期与有效期': 'execution_date',
- }
- '''失信联合惩戒记录'''
- PUNISH_MAPS = {
- '失信记录编号': 'punish_no',
- '失信联合惩戒记录主体': 'punish_subject',
- '法人姓名': 'legal_person',
- '列入名单事由': 'reason',
- '认定部门': 'rdbm',
- '列入日期': 'join_date',
- }
- CRAWL_SITE = 'http://jzsc.mohurd.gov.cn/data/company'
- def sha1(*args):
- """
- 十六进制数字字符串形式摘要值
- @param args: 字符串
- @return: 摘要值
- """
- hash_sha1 = hashlib.sha1()
- for arg in args:
- hash_sha1.update(arg.encode('utf-8'))
- return hash_sha1.hexdigest()
- def get_proxy(scheme=None, default=None, socks5h=False):
- url = 'http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch'
- headers = {'Authorization': 'Basic amlhbnl1MDAxOjEyM3F3ZSFB'}
- try:
- proxy = requests.get(url, headers=headers, timeout=15).json()
- except requests.RequestException:
- return default
- if not proxy:
- logger.debug('暂无代理...')
- return default
- proxies = proxy.get('data')
- if proxies:
- if socks5h:
- proxy_items = proxies.get('http')
- proxy_h = {
- 'http': proxy_items.replace('socks5', 'socks5h'),
- 'https': proxy_items.replace('socks5', 'socks5h')
- }
- proxies = proxy_h
- return proxies if not scheme else proxies.get(scheme, default)
- def html2element(html):
- return fromstring(html)
- def element2html(lxml_element):
- return tostring(lxml_element, encoding='utf-8').decode()
- def display_prompt_popup(html):
- _element = html2element(html)
- node = _element.xpath('//div[@class="el-dialog__wrapper"]')[0]
- _popup_style = node.attrib.get('style')
- if _popup_style is not None:
- _styles = str(_popup_style).split(';')
- res = list(filter(lambda x: len(x) > 0, _styles))[-1].strip().lower()
- if res == 'display: none':
- '''无提示弹框'''
- return False
- '''有提示弹框'''
- return True
- def display_geetest_panel(html):
- _element = html2element(html)
- node = _element.xpath('//div[@class="geetest_panel_next"]')
- if len(node) == 0:
- '''无验证码'''
- return False
- _geetest_panel = node[0]
- geetest_style = _geetest_panel.attrib.get('style')
- if geetest_style is not None and geetest_style == 'display: block;':
- '''有验证码'''
- return True
- else:
- '''无验证码'''
- return False
- def prompt_popup(driver: Chrome, wait_time=None):
- while True:
- if not display_prompt_popup(driver.page_source):
- break
- logger.info(">>> 点击提示弹框")
- driver.find_element_by_xpath('//div[@class="el-dialog captchaDilaog"]/div[3]/div/button[1]').click()
- time.sleep(1)
- '''流程之间的间隔时间'''
- _wait_time = (wait_time or 1)
- time.sleep(_wait_time)
- def geetest_panel(driver: Chrome, wait_time=None, save_img_to_local=False):
- while True:
- if not display_geetest_panel(driver.page_source):
- break
- logger.info(">>> 验证码检测")
- text = input("通过验证后,结束等待。请输入:y")
- if text == 'y':
- continue
- _wait_time = (wait_time or 1)
- time.sleep(wait_time)
- def check_page(driver: Chrome, wait_time=None, **kwargs):
- """检查页面"""
- wait_time = (wait_time or 1)
- prompt_popup(driver, wait_time=wait_time)
- geetest_panel(
- driver,
- wait_time=wait_time,
- save_img_to_local=kwargs.get('save_img_to_local'),
- )
- def click(driver: Chrome, button, wait_time=None, allow_check_page=False, run_js=True):
- if run_js:
- driver.execute_script("arguments[0].click();", button)
- else:
- button.click()
- wait_time = (wait_time or 1)
- time.sleep(wait_time)
- if allow_check_page:
- check_page(driver, wait_time=wait_time)
- def click_query(driver: Chrome, wait_time=None):
- """查询按钮"""
- button = driver.find_element_by_class_name("ssButton")
- wait_time = (wait_time or 1)
- click(driver, button, wait_time=wait_time)
- def next_page(driver: Chrome):
- element = html2element(driver.page_source)
- node = element.xpath('//button[@class="btn-next"]')[0]
- attrib = node.attrib.get('disabled')
- if attrib is not None and attrib == 'disabled':
- '''最大页码'''
- return False
- else:
- '''继续翻页'''
- button = driver.find_element_by_class_name('btn-next')
- click(driver, button)
- return True
- def current_page(html):
- element = html2element(html)
- nodes = element.xpath('//ul[@class="el-pager"]/li')
- for node in nodes:
- if node.attrib.get('class') == 'number active':
- return node.text
- def extract_content(html):
- """抽取页面结构化数据"""
- results = []
- '''字段映射表'''
- _maps = {
- **COMPANY_QUALITY_MAPS,
- **BAD_BEHAVIOR_MAPS,
- **BLACK_LIST_MAPS,
- **PUNISH_MAPS,
- }
- '''转化成dataframe'''
- dfs = pd.read_html(html)
- if len(dfs) == 2:
- columns = list(dfs[0].columns.array)
- values = dfs[1].values
- '''合并内容'''
- panel_container = [dict(zip(columns, val)) for val in values]
- '''转换字段'''
- for item in panel_container:
- _item = {}
- for key, val in item.items():
- if key in _maps:
- _item[_maps[key]] = val
- results.append(_item)
- return results
- def crawl_spider(driver: Chrome, handler):
- """采集爬虫"""
- exception_count = 0
- td_elements = driver.find_elements(By.XPATH, value='//table[@class="el-table__body"]//tr/td[3]')
- for td_element in td_elements:
- if exception_count > 3:
- '''数据异常,停止采集'''
- return False
- title = td_element.text
- '''使用公司名称进行去重'''
- if r.hexists(redis_key, sha1(title)):
- logger.info(f"[重复数据]{title} - 丢弃")
- continue
- button = td_element.find_element_by_class_name("link")
- click(driver, button, wait_time=random.randint(3, 10), run_js=False)
- for current_handler in driver.window_handles:
- if current_handler == handler:
- continue
- '''切换到弹出页面'''
- driver.switch_to.window(current_handler)
- current_url = driver.current_url
- '''首次进入详情页,检查提示弹框和验证码面板'''
- check_page(driver, wait_time=random.randint(2, 6))
- '''企业数据处理'''
- company = {}
- '''企业基础数据'''
- element = html2element(driver.page_source)
- nodes = element.xpath('//div[@class="detaile-header__info--table"]')
- for node in nodes:
- credit_no = "".join(node.xpath('./div[1]/div[1]/div[2]/text()')).strip()
- legal_person = "".join(node.xpath('./div[1]/div[2]/div[2]/text()')).strip()
- company_type = "".join(node.xpath('./div[2]/div[1]/div[2]/text()')).strip()
- address = "".join(node.xpath('./div[2]/div[2]/div[2]/text()')).strip()
- business_address = "".join(node.xpath('./div[3]/div[1]/div[2]/text()')).strip()
- company = {
- 'company_name': title, # 企业名称
- 'credit_no': credit_no, # 统一社会信用代码
- 'legal_person': legal_person, # 企业法定代表人
- 'company_type': company_type, # 企业登记注册类型
- 'address': address, # 企业注册属地
- 'business_address': business_address, # 企业经营地址
- 'industry': '', # 所属行业
- 'register_date': '', # 注册时间
- 'tel_phone': '', # 联系方式
- }
- # logger.info(item)
- '''企业资质'''
- try:
- element = html2element(driver.page_source)
- node = element.xpath('//div[@class="panel-container"]')[0]
- company_quality_html = element2html(node)
- company_quality = extract_content(company_quality_html)
- company['company_quality'] = company_quality
- company['company_quality_html'] = {'html': company_quality_html}
- except IndexError:
- pass
- '''注册人员'''
- try:
- company_staff = driver.find_element_by_id("tab-companyStaff")
- click(driver, company_staff, allow_check_page=True)
- reg_buttons = driver.find_elements(by=By.XPATH, value='//div[contains(@id, "tab-")]/span')
- logger.info(f'>>> 人员注册类别 <<<')
- for element in reg_buttons:
- # TODO 页面需翻页的逻辑未添加
- logger.info(f'[{element.text}]')
- click(driver, element, wait_time=random.randint(1, 3))
- registrar = []
- element = html2element(driver.page_source)
- nodes = element.xpath('//div[@id="pane-companyStaff"]//table[@class="el-table__body"]//tr')
- for node in nodes:
- name = "".join(node.xpath('./td[2]//span/text()')).strip()
- id_no = "".join(node.xpath('./td[3]/div/text()')).strip()
- reg_type = "".join(node.xpath('./td[4]/div/text()')).strip()
- reg_no = "".join(node.xpath('./td[5]/div/text()')).strip()
- reg_major = "".join(node.xpath('./td[6]/div/text()')).strip()
- registrar.append({
- 'name': name, # 姓名
- 'id_no': id_no, # 身份证号
- 'reg_type': reg_type, # 注册类别
- 'reg_no': reg_no, # 注册号(执业印章号)
- 'reg_major': reg_major, # 注册专业
- })
- company['company_staff'] = registrar
- except IndexError:
- pass
- '''不良行为'''
- try:
- bad_behavior = driver.find_element_by_id('tab-badBehavior')
- click(driver, bad_behavior, allow_check_page=True)
- element = html2element(driver.page_source)
- node = element.xpath('//div[@aria-labelledby="tab-badBehavior"]/div')[0]
- bad_behavior_html = element2html(node)
- bad_behaviors = extract_content(bad_behavior_html)
- company['bad_behavior'] = bad_behaviors
- company['bad_behavior_html'] = {'html': bad_behavior_html}
- except IndexError:
- pass
- '''黑名单记录'''
- try:
- black_list = driver.find_element_by_id('tab-blackList')
- click(driver, black_list, allow_check_page=True)
- element = html2element(driver.page_source)
- node = element.xpath('//div[@id="pane-blackList"]/div')[0]
- black_list_html = element2html(node)
- black_list_array = extract_content(black_list_html)
- company['black_list'] = black_list_array
- company['black_list_html'] = {'html': black_list_html}
- except IndexError:
- pass
- '''失信联合惩戒记录'''
- try:
- punish = driver.find_element_by_id('tab-punishLog')
- click(driver, punish, allow_check_page=True)
- element = html2element(driver.page_source)
- node = element.xpath('//div[@id="pane-punishLog"]/div')[0]
- punish_html = element2html(node)
- punish_array = extract_content(punish_html)
- company['punish'] = punish_array
- company['punish_html'] = {'html': punish_html}
- except IndexError:
- pass
- '''保存企业数据'''
- if len(company['credit_no']) > 0:
- company_tab.insert_one(company)
- r.hset(redis_key, sha1(title), title)
- logger.info(f'>>> {title} - {current_url} - 采集成功 - 保存入库')
- else:
- exception_count += 1 # 页面无企业数据
- logger.info(f'>>> {title} - {current_url} - 采集失败 - 无社会信用代码')
- '''关闭详情页标签'''
- driver.close()
- '''返回列表页'''
- driver.switch_to.window(handler)
- '''下一条执行时间'''
- time.sleep(2)
- else:
- return True
- def downloader(driver: Chrome, handler):
- while True:
- logger.info(f">>> 第{current_page(driver.page_source)}页 <<<")
- allow_crawl = crawl_spider(driver, handler)
- '''是否继续采集'''
- if not allow_crawl:
- logger.info("网站数据异常,终止采集")
- return False
- '''翻页'''
- if not next_page(driver):
- logger.info('采集结束')
- break
- return True
- def select_province(driver: Chrome, records):
- """选择注册属地"""
- '''点击省份下拉框'''
- drop_down_button = driver.find_element_by_xpath('//div[@class="region-select"]/div[1]/div[1]/span[1]/span[1]/i[contains(@class,"el-select__caret el-input__icon el-icon-arrow-up")]')
- click(driver, drop_down_button, wait_time=1)
- '''选择省份'''
- li_elements = driver.find_elements(by=By.XPATH, value='/html/body/div[@class="el-select-dropdown el-popper"][1]/div[1]/div[1]/ul/li')
- for element in li_elements:
- province = element.text
- if province not in records:
- logger.info(f'>> 企业注册属地省份:{province} <<')
- click(driver, element, wait_time=1.5)
- records.append(province)
- return False
- else:
- return True
- def select_categories(driver: Chrome, records):
- span_elements = driver.find_elements(by=By.XPATH, value='//div[@class="labelInPut labelInPutRadio"]/span')
- for element in span_elements:
- qualification = element.text
- if qualification not in records:
- logger.info(f'>> 企业资质类别:{qualification} <<')
- records.setdefault(qualification, [])
- provinces = records.get(qualification)
- if provinces is not None:
- if len(provinces) < 32:
- click(driver, element, wait_time=1.5)
- crawl_finished = select_province(driver, provinces)
- if not crawl_finished:
- click_query(driver, wait_time=2)
- return False
- else:
- return True
- def start(enable_remote_driver=False):
- '''
- "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" --remote-debugging-port=9222 --no-first-run --no-default-browser-check --user-data-dir="./data"
- '''
- options = webdriver.ChromeOptions()
- if enable_remote_driver:
- options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
- options.add_argument("--disable-gpu")
- chrome_driver = webdriver.Chrome(
- executable_path="/Users/dongzhaorui/Downloads/chromedriver-mac-x64/chromedriver",
- options=options
- )
- main_handler = chrome_driver.current_window_handle # 获取句柄
- '''清除其余窗口'''
- for handler in chrome_driver.window_handles:
- if handler != main_handler:
- chrome_driver.switch_to.window(handler)
- chrome_driver.close()
- chrome_driver.switch_to.window(main_handler)
- chrome_driver.get(CRAWL_SITE)
- time.sleep(3)
- '''采集记录'''
- records = {
- '全部': None,
- '造价咨询企业': None,
- }
- while True:
- crawl_finished = select_categories(chrome_driver, records)
- if crawl_finished:
- logger.info('任务结束')
- break
- '''下载数据'''
- _continue = downloader(chrome_driver, main_handler)
- if not _continue:
- break
- if not enable_remote_driver:
- chrome_driver.quit()
- if __name__ == '__main__':
- # while True:
- # try:
- # start(enable_remote_driver=True)
- # except:
- # logger.info("等待100秒")
- # time.sleep(100)
- start(enable_remote_driver=True)
|