import copy import functools import hashlib import re from collections import namedtuple from string import whitespace import bson import requests from untils.clean_html import cleaner SearchText = namedtuple('SearchText', ['total']) def substitute(html_str, special=None, completely=False): """HTML 替换""" html_str = cleaner(html=html_str, special=special, completely=completely) return html_str def merge_files(*files): """合并文件""" res = {} for file_ in files: if isinstance(file_, dict): for _, attachment in file_.items(): res[str(len(res) + 1)] = attachment return res def is_all_chinese(strs): """检验是否全是中文字符""" for _char in strs: if not '\u4e00' <= _char <= '\u9fa5': return False return True def clean_document(*fields): """ 清洗mongo文档 :param fields: 清洗字段 # 用例: # >>> clean_document('dzr')(lambda *args, **kw: None)(document) """ def clean(func): @functools.wraps(func) def wrapper(*args, **kwargs): defaults = { "_id", "parser_name", "parser_url", "request_params", "failed", "error" } removes = defaults if not fields else {*defaults, *fields} item = args[0] if not kwargs else kwargs data_dict = item if isinstance(item, dict) else item.to_dict copy_data_dict = copy.deepcopy(data_dict) for k in copy_data_dict.keys(): if k in removes: del data_dict[k] try: delattr(item, k) # 删除 Item 类实例属性 except AttributeError: pass return func(*args, **kwargs) return wrapper return clean def clean_chars(text, charsets=whitespace): """ 按照字符集,删除字符 :param str text: 文本 :param charsets: 字符集 :return: 干净的文本 """ if text is not None: for char in charsets: if char in text: text = text.replace(char, '') return text def get_signature(content: str) -> str: """ 十六进制数字字符串形式摘要值 @param content: 字符串文本 @return: 摘要值 """ sha1 = hashlib.sha1() sha1.update(content.encode("utf-8")) return sha1.hexdigest() def get_md5(val): md5 = hashlib.md5() if isinstance(val, bytes): md5.update(str(val).encode("utf-8")) elif isinstance(val, str): md5.update(val.encode("utf-8")) return md5.hexdigest() def text_search(content: str) -> SearchText: """ 中文检索 :param content: 文本 :return: 中文数量 """ if not content: return SearchText(0) results = re.findall('[\u4e00-\u9fa5]', content, re.S) # 列表长度即是中文的字数 return SearchText(len(results)) def int2long(param: int): """int 转换成 long """ return bson.int64.Int64(param) def njpc_hpsj_filt_keywords(text: str, special_kw=None): if special_kw is None: special_kw = {} keywords = {'项目', '工程', '验收', '评价', *special_kw} for keyword in keywords: result = re.match(f'.*{keyword}', text, re.S) if result is not None: return True # 需要采集 else: return False # 丢弃 # 拟建爬虫字段正则抽取 def njpc_fields_extract(html, data_item, is_clean=False): """ 拟建爬虫字段正则抽取 :param str html: 页面源码 :param Items data_item: 详情页item :param bool is_clean: 是否对源码进行清洗 :return: """ if is_clean: html = substitute(html) data_item.title = data_item.projectname projectname = re.findall('项目名称(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) approvecode = re.findall('项目代码(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) approvecontent = re.findall('(?:事项名称|审批事项)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) owner = re.findall('建设(?:单位|单位名称)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) projectaddr = re.findall('建设(?:地点|地址)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) total_investment = re.findall('总投资(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) project_person = re.findall('联系人(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) project_phone = re.findall('联系(?:电话|方式)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) approvedept = re.findall('审批部门(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) approvenumber = re.findall('(?:审批|批准)文号(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) approvetime = re.findall('审批时间(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) project_scale = "".join(re.findall('建设(?:内容|内容[及|与|和]规模|规模|规模[及|与|和]内容)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)) project_completedate = re.findall('竣工日期(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S) if project_scale: construction_area = search('[总]*\S*建筑[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale) floor_area = search('[总]*\S*占地[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale) if not construction_area: construction_area = "" else: construction_area = re.sub(":|:", "", construction_area) if not floor_area: floor_area = "" else: floor_area = re.sub(":|:", "", floor_area) data_item.project_scale = project_scale data_item.project_scale_info = { "construction_area": construction_area, "floor_area": floor_area, } # 建设规模及主要内容 fields_dict = { "projectname": projectname, "owner": owner, "total_investment": total_investment, "project_person": project_person, "project_phone": project_phone, "approvedept": approvedept, "approvetime": approvetime, "project_completedate": project_completedate, "projectaddr": projectaddr, "approvecode": approvecode, "approvecontent": approvecontent, "approvenumber": approvenumber } for fields_k, fields_v in fields_dict.items(): if fields_v: fields_v[0] = clean_chars(fields_v[0]) if not fields_v[0]: continue data_item[fields_k] = re.sub( r'([,|.|。|)|)|,|;|;|?|&|$|#|@|!|!|%|*|\'|"|‘|’|“|¥|?| ]*?)$', "", fields_v[0]) return data_item def get_proxy(): headers = { "Authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB" } proxy = requests.get("http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch", headers=headers).json() print(f"切换代理:{proxy.get('data')}") return proxy.get("data").get("http") def search(pattern, string): result = re.search(pattern, string) if result: return result.groups()[0] def search_construction(string): result = re.search('pattern', string) if result: return result.groups()[0] def search_floor(string): result = re.search('pattern', string) if result: return result.groups()[0] def get_floor_area(project_scale): floor_area = search('[总]*\S*占地[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale) if not floor_area: floor_area = "" else: floor_area = floor_area.replace(':', '').replace(':', '') return floor_area def get_construction_area(project_scale): construction_area = search('[总]*\S*建筑[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale) if not construction_area: construction_area = "" else: construction_area = construction_area.replace(':', '').replace(':', '') return construction_area