file_compress.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. """
  2. coding=uft-8
  3. gz/tar/tgz/zip/rar
  4. 常用五种类型压缩文件的解压
  5. """
  6. from typing import List
  7. import zipfile
  8. from rarfile import RarFile
  9. from loguru import logger
  10. import os
  11. import re
  12. from docs.config import error_number
  13. pattern = re.compile("^inflating:(.*)")
  14. ALLOW_EXT = ['pdf', 'doc', 'docx', 'png', 'jpg', 'jpeg', 'xls', 'xlsx', 'swf']
  15. class CompressFiles(object):
  16. @staticmethod
  17. def other_un_zip_file(file_path: str, save_dir: str) -> List:
  18. file_list = []
  19. if not os.path.exists(save_dir):
  20. os.makedirs(save_dir, 0o777)
  21. # 解压开始
  22. out_put = os.popen("unzip -o -d %s -O GBK %s" % (save_dir, file_path))
  23. out_put = out_put.read()
  24. split_out = out_put.split("\n")
  25. # 获取文件路径
  26. for row in split_out:
  27. files = pattern.search(row.strip())
  28. if files:
  29. files_re = files.groups()
  30. for file in files_re:
  31. filepath = file.strip()
  32. if os.path.exists(filepath):
  33. file_list.append(filepath)
  34. return file_list
  35. def un_zip_file(self, file: str, save_dir: str) -> List:
  36. print("解压开始--》")
  37. ret = []
  38. if not os.path.exists(save_dir):
  39. os.makedirs(save_dir, 0o777)
  40. try:
  41. file_list = self.other_un_zip_file(file, save_dir)
  42. print(f"解压结束--->{file_list}")
  43. if file_list:
  44. return file_list
  45. except Exception as e:
  46. logger.warning(f"{e}")
  47. print("二次-->", )
  48. zf = zipfile.ZipFile(file, 'r', compression=zipfile.ZIP_DEFLATED)
  49. for index, info in enumerate(zf.infolist()):
  50. if info.file_size == 0:
  51. continue
  52. fn = info.filename
  53. # 排除掉无效的扩展名文件
  54. ext = os.path.splitext(fn)[-1][1:].lower().strip()
  55. if ext not in ALLOW_EXT:
  56. continue
  57. # 隐含文件
  58. if os.path.split(fn)[-1].startswith('.'):
  59. continue
  60. try:
  61. fn = fn.encode('cp437').decode('gbk')
  62. except:
  63. try:
  64. fn = fn.encode('cp437').decode('utf-8')
  65. except:
  66. fn = fn.encode('utf-8').decode('utf-8')
  67. save_path = os.path.join(save_dir, fn)
  68. with open(save_path, 'wb') as fw:
  69. fw.write(zf.read(info.filename))
  70. ret.append(save_path)
  71. return ret
  72. @staticmethod
  73. def un_rar_file(file: str, save_dir: str) -> List:
  74. ret = []
  75. if not os.path.exists(save_dir):
  76. os.makedirs(save_dir, 0o777)
  77. zf = RarFile(file)
  78. for index, info in enumerate(zf.infolist()):
  79. if info.file_size == 0:
  80. continue
  81. fn = info.filename
  82. # 排除掉无效的扩展名文件
  83. ext = os.path.splitext(fn)[-1][1:].lower().strip()
  84. if not ext in ALLOW_EXT:
  85. continue
  86. # 隐含文件
  87. if os.path.split(fn)[-1].startswith('.'):
  88. continue
  89. try:
  90. fn = fn.encode('cp437').decode('gbk')
  91. except:
  92. try:
  93. fn = fn.encode('cp437').decode('utf-8')
  94. except:
  95. fn = fn.encode('utf-8').decode('utf-8')
  96. save_path = os.path.join(save_dir, fn)
  97. with open(save_path, 'wb') as fw:
  98. fw.write(zf.read(info.filename))
  99. ret.append(save_path)
  100. return ret
  101. def extract_file(self, file_path):
  102. """
  103. 解压单个文件
  104. :param filename:
  105. :param file_path:
  106. :return:
  107. """
  108. return_list = []
  109. try:
  110. base_dir, file_name = os.path.split(file_path)
  111. file_name, file_suffix = os.path.splitext(file_name)
  112. save_dir = os.path.join(base_dir, file_name)
  113. file_suffix = file_suffix[1:]
  114. if file_suffix not in ['zip', 'rar']:
  115. return [], error_number["类型不支持"]
  116. else:
  117. if file_suffix == 'zip':
  118. return_list = self.un_zip_file(file_path, save_dir)
  119. elif file_suffix == 'rar':
  120. return_list = self.un_rar_file(file_path, save_dir)
  121. except Exception as e:
  122. print(e)
  123. return [], error_number["解析错误"]
  124. return return_list, error_number["成功"]