123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- """
- coding=uft-8
- gz/tar/tgz/zip/rar
- 常用五种类型压缩文件的解压
- """
- from typing import List
- import zipfile
- from rarfile import RarFile
- from loguru import logger
- import os
- import re
- from docs.config import error_number
- pattern = re.compile("^inflating:(.*)")
- ALLOW_EXT = ['pdf', 'doc', 'docx', 'png', 'jpg', 'jpeg', 'xls', 'xlsx', 'swf']
- class CompressFiles(object):
- @staticmethod
- def other_un_zip_file(file_path: str, save_dir: str) -> List:
- file_list = []
- if not os.path.exists(save_dir):
- os.makedirs(save_dir, 0o777)
- # 解压开始
- out_put = os.popen("unzip -o -d %s -O GBK %s" % (save_dir, file_path))
- out_put = out_put.read()
- split_out = out_put.split("\n")
- # 获取文件路径
- for row in split_out:
- files = pattern.search(row.strip())
- if files:
- files_re = files.groups()
- for file in files_re:
- filepath = file.strip()
- if os.path.exists(filepath):
- file_list.append(filepath)
- return file_list
- def un_zip_file(self, file: str, save_dir: str) -> List:
- print("解压开始--》")
- ret = []
- if not os.path.exists(save_dir):
- os.makedirs(save_dir, 0o777)
- try:
- file_list = self.other_un_zip_file(file, save_dir)
- print(f"解压结束--->{file_list}")
- if file_list:
- return file_list
- except Exception as e:
- logger.warning(f"{e}")
- print("二次-->", )
- zf = zipfile.ZipFile(file, 'r', compression=zipfile.ZIP_DEFLATED)
- for index, info in enumerate(zf.infolist()):
- if info.file_size == 0:
- continue
- fn = info.filename
- # 排除掉无效的扩展名文件
- ext = os.path.splitext(fn)[-1][1:].lower().strip()
- if ext not in ALLOW_EXT:
- continue
- # 隐含文件
- if os.path.split(fn)[-1].startswith('.'):
- continue
- try:
- fn = fn.encode('cp437').decode('gbk')
- except:
- try:
- fn = fn.encode('cp437').decode('utf-8')
- except:
- fn = fn.encode('utf-8').decode('utf-8')
- save_path = os.path.join(save_dir, fn)
- with open(save_path, 'wb') as fw:
- fw.write(zf.read(info.filename))
- ret.append(save_path)
- return ret
- @staticmethod
- def un_rar_file(file: str, save_dir: str) -> List:
- ret = []
- if not os.path.exists(save_dir):
- os.makedirs(save_dir, 0o777)
- zf = RarFile(file)
- for index, info in enumerate(zf.infolist()):
- if info.file_size == 0:
- continue
- fn = info.filename
- # 排除掉无效的扩展名文件
- ext = os.path.splitext(fn)[-1][1:].lower().strip()
- if not ext in ALLOW_EXT:
- continue
- # 隐含文件
- if os.path.split(fn)[-1].startswith('.'):
- continue
- try:
- fn = fn.encode('cp437').decode('gbk')
- except:
- try:
- fn = fn.encode('cp437').decode('utf-8')
- except:
- fn = fn.encode('utf-8').decode('utf-8')
- save_path = os.path.join(save_dir, fn)
- with open(save_path, 'wb') as fw:
- fw.write(zf.read(info.filename))
- ret.append(save_path)
- return ret
- def extract_file(self, file_path):
- """
- 解压单个文件
- :param filename:
- :param file_path:
- :return:
- """
- return_list = []
- try:
- base_dir, file_name = os.path.split(file_path)
- file_name, file_suffix = os.path.splitext(file_name)
- save_dir = os.path.join(base_dir, file_name)
- file_suffix = file_suffix[1:]
- if file_suffix not in ['zip', 'rar']:
- return [], error_number["类型不支持"]
- else:
- if file_suffix == 'zip':
- return_list = self.un_zip_file(file_path, save_dir)
- elif file_suffix == 'rar':
- return_list = self.un_rar_file(file_path, save_dir)
- except Exception as e:
- print(e)
- return [], error_number["解析错误"]
- return return_list, error_number["成功"]
|