import os import re import requests import hashlib from utils.log import logger from urllib.parse import urlparse, unquote # 文件文档类型 DOCTYPE = { 'txt', 'rtf', 'dps', 'et', 'ett', 'xls', 'xlsx', 'xlsb', 'xlsm', 'xlt', 'ods', 'pmd', 'pmdx', 'doc', 'docm', 'docx', 'dot', 'dotm', 'dotx', 'odt', 'wps', 'csv', 'xml', 'xps' } # 压缩类型 COMPRESSION_TYPE = { 'rar', 'zip', 'gzzb', '7z', 'tar', 'gz', 'bz2', 'jar', 'iso', 'cab', 'arj', 'lzh', 'ace', 'uue', 'edxz', } # 图片类型 IMAGE_TYPE = { 'jpg', 'png', 'jpeg', 'tiff', 'gif', 'psd', 'raw', 'eps', 'svg', 'bmp', 'pdf' } # 其他类型 OTHER_TYPE = { 'swf', 'nxzf', 'xezf', 'nxcf' } def sha1(val): _sha1 = hashlib.sha1() if isinstance(val, bytes): _sha1.update(str(val).encode("utf-8")) elif isinstance(val, str): _sha1.update(val.encode("utf-8")) return _sha1.hexdigest() def remove(file_path: str): try: os.remove(file_path) except: pass def getsize(file): try: return os.path.getsize(file) except FileNotFoundError: return 0 def discern_file_format(text): text = text.strip() file_types = { *DOCTYPE, *COMPRESSION_TYPE, *IMAGE_TYPE, *OTHER_TYPE } for file_type in file_types: all_file_format = [file_type, file_type.upper()] for t in all_file_format: result = re.match(f'.*{t}$', text, re.S) if result is not None: return t else: unknown_type = re.findall('[^.\\/:*?"<>|\r\n]+$', text, re.S) logger.warning(f'[附件类型识别]未定义的文件类型{unknown_type}') return None def extract_file_type(text): if text is None: return None return discern_file_format(text) def extract_file_name_by_href(href: str, file_type: str): """从url中抽取文件名称""" # 中文标点符号:[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b] # 中文字符:[\u4e00 -\u9fa5] zh_char_pattern = '[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\u4e00-\u9fa5]+' parser = urlparse(href) query = (parser.query or parser.path) result = re.search(f'.*\\.{file_type}', query, re.S) if result is not None: encode_str = unquote(result.group()) name = re.search(zh_char_pattern, encode_str) if name is not None: return unquote(name.group()) return None def extract_file_name(text): file_type = discern_file_format(text) if file_type is not None: repl = '.{}'.format(file_type) text = text.replace(repl, '') return text def verify_file_name(name): if extract_file_type(name) is None: raise ValueError # 去除附件名空格、两个后缀 def clean_file_name(file_name:str,file_type:str): file_name = file_name.strip() if file_type in file_name: file_name = file_name.replace(f'.{file_type}', '') return file_name # 限制附件大小:size < 5 kb 不存入数据库 def limit_file_size(file_size:str): if "M" in file_size or "m" in file_size: file_size = float("".join(re.findall('^[0-9]\d*\.\d*|[1-9]\d*',file_size))) * 1000 else: file_size = "".join(re.findall('^[0-9]\d*\.\d*|[1-9]\d*', file_size)) if float(file_size) < 5: return False else: return True # 判断附件地址是否正确 def judge_file_url(file_url:str): file_url = file_url.strip() if " " in file_url: file_url = file_url.split(" ")[0] return file_url # 需二次请求 添加附件地址参数 appUrlFlag def add_appUrlFlag_param(file_url): if "appUrlFlag" in file_url and "downloadztbattach" in file_url and "attachGuid" in file_url: file_url = file_url.replace('downloadztbattach','ztbAttachDownloadAction.action') + "&cmd=getContent" return file_url # 附件下载 需验证码 session = requests.session() headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36" } # 打码接口 def get_code(file_path: str) -> dict: upload_address = "http://123.57.163.80:2119/v1/images/verify" with open(file_path, 'rb') as f: image_bytes = f.read() content = {'file': image_bytes} # json_resp = get_verify_code(upload_address, content) headers = {'accept': 'application/json'} response = session.post(upload_address, headers=headers, files=content, stream=True) return response.json() # 识别验证码 def get_dealcode(img_url): res = session.get(img_url, headers=headers) img_path = 'image' if not os.path.exists(img_path): os.mkdir(img_path) with open(img_path + '/zgzbycgw.jpg', 'wb') as f: f.write(res.content) res = get_code(img_path + '/zgzbycgw.jpg') if res.get("msg") == "success": img_code = res.get("r").get("code") else: img_code = None return img_code # 天津市政府采购网 # def tjzfcgw_file_yzm(file_url): # img_url = 'http://www.ccgp-tianjin.gov.cn/commons/image.jsp' # session.get(file_url, headers=headers, verify=False) # # # 下载地址 # file_url_yzm = "http://www.ccgp-tianjin.gov.cn/portal/documentView.do" # # Yzm_result = get_dealcode(img_url).replace("=", "").replace("?", "") # if "x" in Yzm_result: # Yzm_result = Yzm_result.replace("x", "*") # try: # yzm = eval(Yzm_result) # except: # yzm = "" # # params_yzm = { # "imageString": f"{yzm}", # "method": "downNewFiles" # } # # file_result = session.get(file_url_yzm, headers=headers, params=params_yzm, verify=False) # # req_count = 1 # while "请输入验证码" in file_result.text: # if req_count >= 10: # break # Yzm_result = get_dealcode(img_url).replace("=", "").replace("?", "") # if "x" in Yzm_result: # Yzm_result = Yzm_result.replace("x", "*") # try: # yzm = eval(Yzm_result) # except: # yzm = "" # # params_yzm = { # "imageString": f"{yzm}", # "method": "downNewFiles" # } # # file_result = session.get(file_url_yzm, headers=headers, params=params_yzm, verify=False) # # 站点限制 访问频率 ,故休眠时间较大 # time.sleep(random.randint(10,20)) # req_count += 1 # # return file_result.content # 判断 附件下载 是否需要 验证码 # yzm_keywords = ['method=downEnId'] # 附件下载 需要 验证码 的方法 # site_list_yzm = [tjzfcgw_file_yzm] # 判断 附件下载 是否需要 修改 请求方式 req_keywords = ['请求类型防御'] # 附件下载 需要 修改附件地址 的方法 modify_file_url_list = [add_appUrlFlag_param]