123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416 |
- import copy
- import functools
- import hashlib
- import re
- from collections import namedtuple
- from string import whitespace
- import bson
- from bs4 import BeautifulSoup
- from feapder.network.proxy_pool import DirectProxyPool
- from feapder.utils.clean_html import cleaner
- SearchText = namedtuple('SearchText', ['total'])
- def substitute(html_str, special=None, completely=False):
- """HTML 替换"""
- html_str = cleaner(html=html_str, special=special, completely=completely)
- return html_str
- def merge_files(*files):
- """合并文件"""
- res = {}
- for file_ in files:
- if isinstance(file_, dict):
- for _, attachment in file_.items():
- res[str(len(res) + 1)] = attachment
- return res
- def is_all_chinese(strs):
- """检验是否全是中文字符"""
- for _char in strs:
- if not '\u4e00' <= _char <= '\u9fa5':
- return False
- return True
- def clean_document(*fields):
- """
- 清洗mongo文档
- :param fields: 清洗字段
- # 用例:
- # >>> clean_document('dzr')(lambda *args, **kw: None)(document)
- """
- def clean(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- defaults = {
- "_id",
- "parser_name", "parser_url", "request_params",
- "failed", "error"
- }
- removes = defaults if not fields else {*defaults, *fields}
- item = args[0] if not kwargs else kwargs
- data_dict = item if isinstance(item, dict) else item.to_dict
- copy_data_dict = copy.deepcopy(data_dict)
- for k in copy_data_dict.keys():
- if k in removes:
- del data_dict[k]
- try:
- delattr(item, k) # 删除 Item 类实例属性
- except AttributeError:
- pass
- return func(*args, **kwargs)
- return wrapper
- return clean
- def clean_chars(text, charsets=whitespace):
- """
- 按照字符集,删除字符
- :param str text: 文本
- :param charsets: 字符集
- :return: 干净的文本
- """
- if text is not None:
- for char in charsets:
- if char in text:
- text = text.replace(char, '')
- return text
- def get_signature(content: str) -> str:
- """
- 十六进制数字字符串形式摘要值
- @param content: 字符串文本
- @return: 摘要值
- """
- sha1 = hashlib.sha1()
- sha1.update(content.encode("utf-8"))
- return sha1.hexdigest()
- def get_md5(val):
- md5 = hashlib.md5()
- if isinstance(val, bytes):
- md5.update(str(val).encode("utf-8"))
- elif isinstance(val, str):
- md5.update(val.encode("utf-8"))
- return md5.hexdigest()
- def text_search(content: str) -> SearchText:
- """
- 中文检索
- :param content: 文本
- :return: 中文数量
- """
- if not content:
- return SearchText(0)
- results = re.findall('[\u4e00-\u9fa5]', content, re.S)
- # 列表长度即是中文的字数
- return SearchText(len(results))
- def int2long(param: int):
- """int 转换成 long """
- return bson.int64.Int64(param)
- def njpc_hpsj_filt_keywords(text: str, special_kw=None):
- if special_kw is None:
- special_kw = {}
- keywords = {'项目', '工程', '验收', '评价', *special_kw}
- for keyword in keywords:
- result = re.match(f'.*{keyword}', text, re.S)
- if result is not None:
- return True # 需要采集
- else:
- return False # 丢弃
- # 拟建爬虫字段正则抽取
- def njpc_fields_extract(html, data_item, is_clean=False):
- """
- 拟建爬虫字段正则抽取
- :param str html: 页面源码
- :param Items data_item: 详情页item
- :param bool is_clean: 是否对源码进行清洗
- :return:
- """
- if is_clean:
- html = substitute(html)
- data_item.title = data_item.projectname
- projectname = re.findall('项目名称(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- approvecode = re.findall('项目(?:代码|编码)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- approvecontent = re.findall('(?:事项名称|审批事项)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- owner = re.findall('[建设|项目](?:单位|单位名称|业主)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- projectaddr = re.findall('建设(?:地点|地址)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- total_investment = re.findall('总投资(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- project_person = re.findall('联系人(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- project_phone = re.findall('联系(?:电话|方式)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- approvedept = re.findall('审批部门(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- approvenumber = re.findall('(?:审批|批准)文号(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- approvetime = re.findall('审批时间(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- project_scale = "".join(re.findall('建设(?:内容|内容[及|与|和]规模|规模|规模[及|与|和]内容)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S))
- project_completedate = re.findall('竣工日期(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- if project_scale:
- construction_area = search('[总]*\S*建筑[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale)
- floor_area = search('[总]*\S*占地[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale)
- if not construction_area:
- construction_area = ""
- else:
- construction_area = re.sub(":|:", "", construction_area)
- if not floor_area:
- floor_area = ""
- else:
- floor_area = re.sub(":|:", "", floor_area)
- data_item.project_scale = project_scale
- data_item.project_scale_info = {
- "construction_area": construction_area,
- "floor_area": floor_area,
- } # 建设规模及主要内容
- fields_dict = {
- "projectname": projectname,
- "owner": owner,
- "total_investment": total_investment,
- "project_person": project_person,
- "project_phone": project_phone,
- "approvedept": approvedept,
- "approvetime": approvetime,
- "project_completedate": project_completedate,
- "projectaddr": projectaddr,
- "approvecode": approvecode,
- "approvecontent": approvecontent,
- "approvenumber": approvenumber
- }
- for fields_k, fields_v in fields_dict.items():
- if fields_v:
- fields_v[0] = clean_chars(fields_v[0])
- if not fields_v[0]:
- continue
- data_item[fields_k] = re.sub(
- r'([,|.|。|)|)|,|;|;|?|&|$|#|@|!|!|%|*|\'|"|‘|’|“|¥|?| ]*?)$',
- "", fields_v[0])
- return data_item
- # 拟建爬虫字段正则抽取(抽取信息结尾必须含有[,,.。;;一二三四五六七八九十、::]等标识符)
- def njpc_fields_extract_special(html, data_item):
- """
- 抽取信息结尾必须含有[,,.。;;一二三四五六七八九十、::]等标识符
- :param str html: 页面源码
- :param Items data_item: 详情页item
- :return: 抽取完成字段表
- """
- # 清洗掉所有标签
- soup = BeautifulSoup(html, 'html.parser')
- html = "".join(soup.get_text().split()).strip()
- # 抽取字段
- data_item.title = data_item.projectname
- projectname = re.findall('项目名称(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]+[,,.。;;、::]', html, re.S)
- approvecode = re.findall('项目(?:代码|编码)(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,.。;;、::]', html, re.S)
- approvecontent = re.findall('(?:事项名称|审批事项)(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]+[,,.。;;、::]', html, re.S)
- owner = re.findall('[建设|项目](?:单位|单位名称|业主)(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,.。;;、::]', html, re.S)
- projectaddr = re.findall('建设(?:地点|地址)(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]+[,,.。;;、::]', html, re.S)
- total_investment = re.findall('总投资(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[。;;、::]', html, re.S)
- project_person = re.findall('联系人(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,.。;;、::]', html, re.S)
- project_phone = re.findall('联系(?:电话|方式)(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,.。;;、::]', html, re.S)
- approvedept = re.findall('审批部门(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,.。;;、::]', html, re.S)
- approvenumber = re.findall('(?:审批|批准)文号(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,.。;;、::]', html, re.S)
- approvetime = re.findall('审批时间(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,。;;、]', html, re.S)
- project_scale = "".join(re.findall('建设(?:内容|内容[及|与|和]规模|规模|规模[及|与|和]内容)(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]+[、::]', html, re.S))
- project_completedate = re.findall('竣工日期(?:[\u4e00-\u9fa5]+|)[:|:](.*?)[一二三四五六七八九十\d]?[,,。;;、]', html, re.S)
- if project_scale:
- construction_area = search('[总]*\S*建筑[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale)
- floor_area = search('[总]*\S*占地[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale)
- if not construction_area:
- construction_area = ""
- else:
- construction_area = re.sub(":|:", "", construction_area)
- if not floor_area:
- floor_area = ""
- else:
- floor_area = re.sub(":|:", "", floor_area)
- data_item.project_scale = project_scale
- data_item.project_scale_info = {
- "construction_area": construction_area,
- "floor_area": floor_area,
- } # 建设规模及主要内容
- fields_dict = {
- "projectname": projectname,
- "owner": owner,
- "total_investment": total_investment,
- "project_person": project_person,
- "project_phone": project_phone,
- "approvedept": approvedept,
- "approvetime": approvetime,
- "project_completedate": project_completedate,
- "projectaddr": projectaddr,
- "approvecode": approvecode,
- "approvecontent": approvecontent,
- "approvenumber": approvenumber
- }
- for fields_k, fields_v in fields_dict.items():
- if fields_v:
- fields_v[0] = clean_chars(fields_v[0])
- if not fields_v[0]:
- continue
- data_item[fields_k] = re.sub(
- r'([,|.|。|)|)|,|;|;|?|&|$|#|@|!|!|%|*|\'|"|‘|’|“|¥|?| ]*?)$',
- "", fields_v[0])
- return data_item
- def get_proxy(scheme=None, default=None, socks5h=False):
- proxy = DirectProxyPool(
- "http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch",
- "Basic amlhbnl1MDAxOjEyM3F3ZSFB"
- )
- try:
- proxies = proxy.get_proxy()
- except Exception:
- proxies = default
- print(f"切换代理:{proxies}")
- if proxies is not None:
- if socks5h:
- proxyh = {
- "http": proxies.get("http").replace("socks5", "socks5h"),
- "https": proxies.get("http").replace("socks5", "socks5h")
- }
- proxies = proxyh
- if not scheme:
- return proxies
- else:
- return proxies.get(scheme, default)
- def search(pattern, string):
- result = re.search(pattern, string)
- if result:
- return result.groups()[0]
- def search_construction(string):
- result = re.search('pattern', string)
- if result:
- return result.groups()[0]
- def search_floor(string):
- result = re.search('pattern', string)
- if result:
- return result.groups()[0]
- def get_floor_area(project_scale):
- floor_area = search('[总]*\S*占地[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale)
- if not floor_area:
- floor_area = ""
- else:
- floor_area = floor_area.replace(':', '').replace(':', '')
- return floor_area
- def get_construction_area(project_scale):
- construction_area = search('[总]*\S*建筑[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale)
- if not construction_area:
- construction_area = ""
- else:
- construction_area = construction_area.replace(':', '').replace(':', '')
- return construction_area
- def remove_htmldata(remove_info_list:list, html:str, response):
- """
- 过滤详情页无效数据
- Args:
- remove_info_list: 需删除内容的xpath或文本 -> list [xpath,re,str] eg:['<re>data:image/(.*?)"',]
- html: 待清洗文本
- response: 原文响应体
- Returns: 清洗后的文本
- """
- if html and remove_info_list:
- for extra_item in remove_info_list:
- if re.search('^//.*', extra_item):
- extra_html_list = response.xpath(extra_item).extract()
- for extra_html in extra_html_list:
- if extra_html:
- html = html.replace(extra_html, '')
- elif re.search('^<re>.*', extra_item):
- extra_item = extra_item.replace('<re>','')
- extra_html_list = re.findall(f'{extra_item}',html,re.S|re.I|re.M)
- if extra_html_list:
- for exhtml in extra_html_list:
- html = html.replace(exhtml, '')
- else:
- extra_html = extra_item
- if extra_html:
- html = html.replace(extra_html, '')
- return html
- def extract_file_type(file_name="附件名", file_url="附件地址",file_type_list=[]):
- """
- 抽取附件类型
- Args:
- file_name: 附件名
- file_url: 附件地址
- file_type_list: 其他附件后缀
- Returns: 附件类型
- """
- if file_name and file_url:
- file_name = file_name.strip()
- file_types = ['zip', 'docx', 'ftp', 'pdf', 'doc', 'rar', 'gzzb', 'hzzbs',
- 'jpg', 'png', 'zbid', 'xls', 'xlsx', 'swp', 'dwg']
- if file_type_list:
- ftp_list = list(map(lambda x: x.lower(), file_type_list))
- file_types.extend(ftp_list)
- file_type = file_url.split('?')[0].split('.')[-1].lower()
- if file_type not in file_types:
- file_type = file_url.split('?')[-1].split('.')[-1].lower()
- if file_type in file_types:
- return file_type
- else:
- for ftp in file_types:
- file_type = re.search(ftp, file_name) or re.search("\." + ftp, file_url)
- if file_type:
- return file_type.group(0).replace('.','')
- else:
- return file_type
- return None
|