""" coding=uft-8 gz/tar/tgz/zip/rar 常用五种类型压缩文件的解压 """ from typing import List import zipfile from rarfile import RarFile from loguru import logger import os import re from docs.config import error_number pattern = re.compile("^inflating:(.*)") ALLOW_EXT = ['pdf', 'doc', 'docx', 'png', 'jpg', 'jpeg', 'xls', 'xlsx', 'swf'] class CompressFiles(object): @staticmethod def other_un_zip_file(file_path: str, save_dir: str) -> List: file_list = [] if not os.path.exists(save_dir): os.makedirs(save_dir, 0o777) # 解压开始 out_put = os.popen("unzip -o -d %s -O GBK %s" % (save_dir, file_path)) out_put = out_put.read() split_out = out_put.split("\n") # 获取文件路径 for row in split_out: files = pattern.search(row.strip()) if files: files_re = files.groups() for file in files_re: filepath = file.strip() if os.path.exists(filepath): file_list.append(filepath) return file_list def un_zip_file(self, file: str, save_dir: str) -> List: print("解压开始--》") ret = [] if not os.path.exists(save_dir): os.makedirs(save_dir, 0o777) try: file_list = self.other_un_zip_file(file, save_dir) print(f"解压结束--->{file_list}") if file_list: return file_list except Exception as e: logger.warning(f"{e}") print("二次-->", ) zf = zipfile.ZipFile(file, 'r', compression=zipfile.ZIP_DEFLATED) for index, info in enumerate(zf.infolist()): if info.file_size == 0: continue fn = info.filename # 排除掉无效的扩展名文件 ext = os.path.splitext(fn)[-1][1:].lower().strip() if ext not in ALLOW_EXT: continue # 隐含文件 if os.path.split(fn)[-1].startswith('.'): continue try: fn = fn.encode('cp437').decode('gbk') except: try: fn = fn.encode('cp437').decode('utf-8') except: fn = fn.encode('utf-8').decode('utf-8') save_path = os.path.join(save_dir, fn) with open(save_path, 'wb') as fw: fw.write(zf.read(info.filename)) ret.append(save_path) return ret @staticmethod def un_rar_file(file: str, save_dir: str) -> List: ret = [] if not os.path.exists(save_dir): os.makedirs(save_dir, 0o777) zf = RarFile(file) for index, info in enumerate(zf.infolist()): if info.file_size == 0: continue fn = info.filename # 排除掉无效的扩展名文件 ext = os.path.splitext(fn)[-1][1:].lower().strip() if not ext in ALLOW_EXT: continue # 隐含文件 if os.path.split(fn)[-1].startswith('.'): continue try: fn = fn.encode('cp437').decode('gbk') except: try: fn = fn.encode('cp437').decode('utf-8') except: fn = fn.encode('utf-8').decode('utf-8') save_path = os.path.join(save_dir, fn) with open(save_path, 'wb') as fw: fw.write(zf.read(info.filename)) ret.append(save_path) return ret def extract_file(self, file_path): """ 解压单个文件 :param filename: :param file_path: :return: """ return_list = [] try: base_dir, file_name = os.path.split(file_path) file_name, file_suffix = os.path.splitext(file_name) save_dir = os.path.join(base_dir, file_name) file_suffix = file_suffix[1:] if file_suffix not in ['zip', 'rar']: return [], error_number["类型不支持"] else: if file_suffix == 'zip': return_list = self.un_zip_file(file_path, save_dir) elif file_suffix == 'rar': return_list = self.un_rar_file(file_path, save_dir) except Exception as e: print(e) return [], error_number["解析错误"] return return_list, error_number["成功"]