123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- import copy
- import functools
- import hashlib
- import re
- from collections import namedtuple
- from string import whitespace
- import bson
- import requests
- from untils.clean_html import cleaner
- SearchText = namedtuple('SearchText', ['total'])
- def substitute(html_str, special=None, completely=False):
- """HTML 替换"""
- html_str = cleaner(html=html_str, special=special, completely=completely)
- return html_str
- def merge_files(*files):
- """合并文件"""
- res = {}
- for file_ in files:
- if isinstance(file_, dict):
- for _, attachment in file_.items():
- res[str(len(res) + 1)] = attachment
- return res
- def is_all_chinese(strs):
- """检验是否全是中文字符"""
- for _char in strs:
- if not '\u4e00' <= _char <= '\u9fa5':
- return False
- return True
- def clean_document(*fields):
- """
- 清洗mongo文档
- :param fields: 清洗字段
- # 用例:
- # >>> clean_document('dzr')(lambda *args, **kw: None)(document)
- """
- def clean(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- defaults = {
- "_id",
- "parser_name", "parser_url", "request_params",
- "failed", "error"
- }
- removes = defaults if not fields else {*defaults, *fields}
- item = args[0] if not kwargs else kwargs
- data_dict = item if isinstance(item, dict) else item.to_dict
- copy_data_dict = copy.deepcopy(data_dict)
- for k in copy_data_dict.keys():
- if k in removes:
- del data_dict[k]
- try:
- delattr(item, k) # 删除 Item 类实例属性
- except AttributeError:
- pass
- return func(*args, **kwargs)
- return wrapper
- return clean
- def clean_chars(text, charsets=whitespace):
- """
- 按照字符集,删除字符
- :param str text: 文本
- :param charsets: 字符集
- :return: 干净的文本
- """
- if text is not None:
- for char in charsets:
- if char in text:
- text = text.replace(char, '')
- return text
- def get_signature(content: str) -> str:
- """
- 十六进制数字字符串形式摘要值
- @param content: 字符串文本
- @return: 摘要值
- """
- sha1 = hashlib.sha1()
- sha1.update(content.encode("utf-8"))
- return sha1.hexdigest()
- def get_md5(val):
- md5 = hashlib.md5()
- if isinstance(val, bytes):
- md5.update(str(val).encode("utf-8"))
- elif isinstance(val, str):
- md5.update(val.encode("utf-8"))
- return md5.hexdigest()
- def text_search(content: str) -> SearchText:
- """
- 中文检索
- :param content: 文本
- :return: 中文数量
- """
- if not content:
- return SearchText(0)
- results = re.findall('[\u4e00-\u9fa5]', content, re.S)
- # 列表长度即是中文的字数
- return SearchText(len(results))
- def int2long(param: int):
- """int 转换成 long """
- return bson.int64.Int64(param)
- def njpc_hpsj_filt_keywords(text: str, special_kw=None):
- if special_kw is None:
- special_kw = {}
- keywords = {'项目', '工程', '验收', '评价', *special_kw}
- for keyword in keywords:
- result = re.match(f'.*{keyword}', text, re.S)
- if result is not None:
- return True # 需要采集
- else:
- return False # 丢弃
- # 拟建爬虫字段正则抽取
- def njpc_fields_extract(html, data_item, is_clean=False):
- """
- 拟建爬虫字段正则抽取
- :param str html: 页面源码
- :param Items data_item: 详情页item
- :param bool is_clean: 是否对源码进行清洗
- :return:
- """
- if is_clean:
- html = substitute(html)
- data_item.title = data_item.projectname
- projectname = re.findall('项目名称(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- approvecode = re.findall('项目代码(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- approvecontent = re.findall('(?:事项名称|审批事项)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- owner = re.findall('建设(?:单位|单位名称)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- projectaddr = re.findall('建设(?:地点|地址)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- total_investment = re.findall('总投资(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- project_person = re.findall('联系人(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- project_phone = re.findall('联系(?:电话|方式)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- approvedept = re.findall('审批部门(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- approvenumber = re.findall('(?:审批|批准)文号(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- approvetime = re.findall('审批时间(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- project_scale = "".join(re.findall('建设(?:内容|内容[及|与|和]规模|规模|规模[及|与|和]内容)(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S))
- project_completedate = re.findall('竣工日期(?:<[^>]+>|)[:|:](?:<[^>]+>|)(.*?)<', html, re.S)
- if project_scale:
- construction_area = search('[总]*\S*建筑[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale)
- floor_area = search('[总]*\S*占地[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale)
- if not construction_area:
- construction_area = ""
- else:
- construction_area = re.sub(":|:", "", construction_area)
- if not floor_area:
- floor_area = ""
- else:
- floor_area = re.sub(":|:", "", floor_area)
- data_item.project_scale = project_scale
- data_item.project_scale_info = {
- "construction_area": construction_area,
- "floor_area": floor_area,
- } # 建设规模及主要内容
- fields_dict = {
- "projectname": projectname,
- "owner": owner,
- "total_investment": total_investment,
- "project_person": project_person,
- "project_phone": project_phone,
- "approvedept": approvedept,
- "approvetime": approvetime,
- "project_completedate": project_completedate,
- "projectaddr": projectaddr,
- "approvecode": approvecode,
- "approvecontent": approvecontent,
- "approvenumber": approvenumber
- }
- for fields_k, fields_v in fields_dict.items():
- if fields_v:
- fields_v[0] = clean_chars(fields_v[0])
- if not fields_v[0]:
- continue
- data_item[fields_k] = re.sub(
- r'([,|.|。|)|)|,|;|;|?|&|$|#|@|!|!|%|*|\'|"|‘|’|“|¥|?| ]*?)$',
- "", fields_v[0])
- return data_item
- def get_proxy():
- headers = {
- "Authorization": "Basic amlhbnl1MDAxOjEyM3F3ZSFB"
- }
- proxy = requests.get("http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch", headers=headers).json()
- print(f"切换代理:{proxy.get('data')}")
- return proxy.get("data").get("http")
- def search(pattern, string):
- result = re.search(pattern, string)
- if result:
- return result.groups()[0]
- def search_construction(string):
- result = re.search('pattern', string)
- if result:
- return result.groups()[0]
- def search_floor(string):
- result = re.search('pattern', string)
- if result:
- return result.groups()[0]
- def get_floor_area(project_scale):
- floor_area = search('[总]*\S*占地[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale)
- if not floor_area:
- floor_area = ""
- else:
- floor_area = floor_area.replace(':', '').replace(':', '')
- return floor_area
- def get_construction_area(project_scale):
- construction_area = search('[总]*\S*建筑[面|面积]*[约|为]*(.*?)[。|,|,|;]', project_scale)
- if not construction_area:
- construction_area = ""
- else:
- construction_area = construction_area.replace(':', '').replace(':', '')
- return construction_area
|