import copy import functools import hashlib import re from collections import namedtuple from string import whitespace import bson from bs4 import BeautifulSoup from feapder.network.proxy_pool import DirectProxyPool from feapder.utils.clean_html import cleaner SearchText = namedtuple('SearchText', ['total']) def substitute(html_str, special=None, completely=False): """HTML 替换""" html_str = cleaner(html=html_str, special=special, completely=completely) return html_str def merge_files(*files): """合并文件""" res = {} for file_ in files: if isinstance(file_, dict): for _, attachment in file_.items(): res[str(len(res) + 1)] = attachment return res def is_all_chinese(strs): """检验是否全是中文字符""" for _char in strs: if not '\u4e00' <= _char <= '\u9fa5': return False return True def clean_document(*fields): """ 清洗mongo文档 :param fields: 清洗字段 # 用例: # >>> clean_document('dzr')(lambda *args, **kw: None)(document) """ def clean(func): @functools.wraps(func) def wrapper(*args, **kwargs): defaults = { "_id", "parser_name", "parser_url", "request_params", "failed", "error" } removes = defaults if not fields else {*defaults, *fields} item = args[0] if not kwargs else kwargs data_dict = item if isinstance(item, dict) else item.to_dict copy_data_dict = copy.deepcopy(data_dict) for k in copy_data_dict.keys(): if k in removes: del data_dict[k] try: delattr(item, k) # 删除 Item 类实例属性 except AttributeError: pass return func(*args, **kwargs) return wrapper return clean def clean_chars(text, charsets=whitespace): """ 按照字符集,删除字符 :param str text: 文本 :param charsets: 字符集 :return: 干净的文本 """ if text is not None: for char in charsets: if char in text: text = text.replace(char, '') return text def get_signature(content: str) -> str: """ 十六进制数字字符串形式摘要值 @param content: 字符串文本 @return: 摘要值 """ sha1 = hashlib.sha1() sha1.update(content.encode("utf-8")) return sha1.hexdigest() def get_md5(val): md5 = hashlib.md5() if isinstance(val, bytes): md5.update(str(val).encode("utf-8")) elif isinstance(val, str): md5.update(val.encode("utf-8")) return md5.hexdigest() def text_search(content: str) -> SearchText: """ 中文检索 :param content: 文本 :return: 中文数量 """ if not content: return SearchText(0) results = re.findall('[\u4e00-\u9fa5]', content, re.S) # 列表长度即是中文的字数 return SearchText(len(results)) def int2long(param: int): """int 转换成 long """ return bson.int64.Int64(param) def njpc_hpsj_filt_keywords(text: str, special_kw=None): if special_kw is None: special_kw = {} keywords = {'项目', '工程', '验收', '评价', *special_kw} for keyword in keywords: result = re.match(f'.*{keyword}', text, re.S) if result is not None: return True # 需要采集 else: return False # 丢弃 # 拟建爬虫字段正则抽取 def njpc_fields_extract(html, data_item, is_clean=False): """ 拟建爬虫字段正则抽取 :param str html: 页面源码 :param Items data_item: 详情页item :param bool is_clean: 是否对源码进行清洗 :return: """ if is_clean: html = substitute(html) data_item.title = data_item.projectname projectname = re.findall('项目名称(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) approvecode = re.findall('项目(?:代码|编码)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) approvecontent = re.findall('(?:事项名称|审批事项)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) owner = re.findall('[建设|项目](?:单位|单位名称|业主)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) projectaddr = re.findall('建设(?:地点|地址)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) total_investment = re.findall('总投资(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) project_person = re.findall('联系人(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) project_phone = re.findall('联系(?:电话|方式)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) approvedept = re.findall('审批部门(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) approvenumber = re.findall('(?:审批|批准)文号(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) approvetime = re.findall('审批时间(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) project_scale = "".join(re.findall('建设(?:内容|内容[及|与|和]规模|规模|规模[及|与|和]内容)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)) project_completedate = re.findall('竣工日期(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) if project_scale: construction_area = search('[总]*\S*建筑[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale) floor_area = search('[总]*\S*占地[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale) if not construction_area: construction_area = "" else: construction_area = re.sub(":|:", "", construction_area) if not floor_area: floor_area = "" else: floor_area = re.sub(":|:", "", floor_area) data_item.project_scale = project_scale data_item.project_scale_info = { "construction_area": construction_area, "floor_area": floor_area, } # 建设规模及主要内容 fields_dict = { "projectname": projectname, "owner": owner, "total_investment": total_investment, "project_person": project_person, "project_phone": project_phone, "approvedept": approvedept, "approvetime": approvetime, "project_completedate": project_completedate, "projectaddr": projectaddr, "approvecode": approvecode, "approvecontent": approvecontent, "approvenumber": approvenumber } for fields_k, fields_v in fields_dict.items(): if fields_v: fields_v[0] = clean_chars(fields_v[0]) if not fields_v[0]: continue data_item[fields_k] = re.sub( r'([,|.|。|)|)|,|;|;|?|&|$|#|@|!|!|%|*|\'|"|‘|’|“|¥|?| ]*?)$', "", fields_v[0]) return data_item # 拟建爬虫字段正则抽取(抽取信息结尾必须含有[,,.。;;一二三四五六七八九十、::]等标识符) def njpc_fields_extract_special(html, data_item): """ 抽取信息结尾必须含有[,,.。;;一二三四五六七八九十、::]等标识符 :param str html: 页面源码 :param Items data_item: 详情页item :return: 抽取完成字段表 """ # 清洗掉所有标签 soup = BeautifulSoup(html, 'html.parser') html = "".join(soup.get_text().split()).strip() # 抽取字段 data_item.title = data_item.projectname projectname = re.findall('项目名称(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]+[,,.。;;、::]', html, re.S) approvecode = re.findall('项目(?:代码|编码)(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,.。;;、::]', html, re.S) approvecontent = re.findall('(?:事项名称|审批事项)(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]+[,,.。;;、::]', html, re.S) owner = re.findall('[建设|项目](?:单位|单位名称|业主)(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,.。;;、::]', html, re.S) projectaddr = re.findall('建设(?:地点|地址)(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]+[,,.。;;、::]', html, re.S) total_investment = re.findall('总投资(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[。;;、::]', html, re.S) project_person = re.findall('联系人(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,.。;;、::]', html, re.S) project_phone = re.findall('联系(?:电话|方式)(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,.。;;、::]', html, re.S) approvedept = re.findall('审批部门(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,.。;;、::]', html, re.S) approvenumber = re.findall('(?:审批|批准)文号(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,.。;;、::]', html, re.S) approvetime = re.findall('审批时间(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,。;;、]', html, re.S) project_scale = "".join(re.findall('建设(?:内容|内容[及|与|和]规模|规模|规模[及|与|和]内容)(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]+[、::]', html, re.S)) project_completedate = re.findall('竣工日期(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,。;;、]', html, re.S) if project_scale: construction_area = search('[总]*\S*建筑[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale) floor_area = search('[总]*\S*占地[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale) if not construction_area: construction_area = "" else: construction_area = re.sub(":|:", "", construction_area) if not floor_area: floor_area = "" else: floor_area = re.sub(":|:", "", floor_area) data_item.project_scale = project_scale data_item.project_scale_info = { "construction_area": construction_area, "floor_area": floor_area, } # 建设规模及主要内容 fields_dict = { "projectname": projectname, "owner": owner, "total_investment": total_investment, "project_person": project_person, "project_phone": project_phone, "approvedept": approvedept, "approvetime": approvetime, "project_completedate": project_completedate, "projectaddr": projectaddr, "approvecode": approvecode, "approvecontent": approvecontent, "approvenumber": approvenumber } for fields_k, fields_v in fields_dict.items(): if fields_v: fields_v[0] = clean_chars(fields_v[0]) if not fields_v[0]: continue data_item[fields_k] = re.sub( r'([,|.|。|)|)|,|;|;|?|&|$|#|@|!|!|%|*|\'|"|‘|’|“|¥|?| ]*?)$', "", fields_v[0]) return data_item def get_proxy(scheme=None, default=None, socks5h=False): proxy = DirectProxyPool( "http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch", "Basic amlhbnl1MDAxOjEyM3F3ZSFB" ) try: proxies = proxy.get_proxy() except Exception: proxies = default print(f"切换代理:{proxies}") if proxies is not None: if socks5h: proxyh = { "http": proxies.get("http").replace("socks5", "socks5h"), "https": proxies.get("http").replace("socks5", "socks5h") } proxies = proxyh if not scheme: return proxies else: return proxies.get(scheme, default) def search(pattern, string): result = re.search(pattern, string) if result: return result.groups()[0] def search_construction(string): result = re.search('pattern', string) if result: return result.groups()[0] def search_floor(string): result = re.search('pattern', string) if result: return result.groups()[0] def get_floor_area(project_scale): floor_area = search('[总]*\S*占地[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale) if not floor_area: floor_area = "" else: floor_area = floor_area.replace(':', '').replace(':', '') return floor_area def get_construction_area(project_scale): construction_area = search('[总]*\S*建筑[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale) if not construction_area: construction_area = "" else: construction_area = construction_area.replace(':', '').replace(':', '') return construction_area def remove_htmldata(remove_info_list:list, html:str, response): """ 过滤详情页无效数据 Args: remove_info_list: 需删除内容的xpath或文本 -> list [xpath,re,str] eg:['data:image/(.*?)"',] html: 待清洗文本 response: 原文响应体 Returns: 清洗后的文本 """ if html and remove_info_list: for extra_item in remove_info_list: if re.search('^//.*', extra_item): extra_html_list = response.xpath(extra_item).extract() for extra_html in extra_html_list: if extra_html: html = html.replace(extra_html, '') elif re.search('^.*', extra_item): extra_item = extra_item.replace('','') extra_html_list = re.findall(f'{extra_item}',html,re.S|re.I|re.M) if extra_html_list: for exhtml in extra_html_list: html = html.replace(exhtml, '') else: extra_html = extra_item if extra_html: html = html.replace(extra_html, '') return html def extract_file_type(file_name="附件名", file_url="附件地址",file_type_list=[]): """ 抽取附件类型 Args: file_name: 附件名 file_url: 附件地址 file_type_list: 其他附件后缀 Returns: 附件类型 """ if file_name and file_url: file_name = file_name.strip() file_types = ['zip', 'docx', 'ftp', 'pdf', 'doc', 'rar', 'gzzb', 'hzzbs', 'jpg', 'png', 'zbid', 'xls', 'xlsx', 'swp', 'dwg'] if file_type_list: ftp_list = list(map(lambda x: x.lower(), file_type_list)) file_types.extend(ftp_list) file_type = file_url.split('?')[0].split('.')[-1].lower() if file_type not in file_types: file_type = file_url.split('?')[-1].split('.')[-1].lower() if file_type in file_types: return file_type else: for ftp in file_types: file_type = re.search(ftp, file_name) or re.search("\." + ftp, file_url) if file_type: return file_type.group(0).replace('.','') else: return file_type return None