clean_file.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. import os
  2. import re
  3. import requests
  4. import hashlib
  5. from utils.log import logger
  6. from urllib.parse import urlparse, unquote
  7. # 文件文档类型
  8. DOCTYPE = {
  9. 'txt', 'rtf', 'dps', 'et', 'ett', 'xls',
  10. 'xlsx', 'xlsb', 'xlsm', 'xlt', 'ods', 'pmd', 'pmdx',
  11. 'doc', 'docm', 'docx', 'dot', 'dotm', 'dotx',
  12. 'odt', 'wps', 'csv', 'xml', 'xps'
  13. }
  14. # 压缩类型
  15. COMPRESSION_TYPE = {
  16. 'rar', 'zip', 'gzzb', '7z', 'tar', 'gz', 'bz2', 'jar', 'iso', 'cab',
  17. 'arj', 'lzh', 'ace', 'uue', 'edxz',
  18. }
  19. # 图片类型
  20. IMAGE_TYPE = {
  21. 'jpg', 'png', 'jpeg', 'tiff', 'gif', 'psd', 'raw', 'eps', 'svg', 'bmp',
  22. 'pdf'
  23. }
  24. # 其他类型
  25. OTHER_TYPE = {
  26. 'swf', 'nxzf', 'xezf', 'nxcf'
  27. }
  28. def sha1(val):
  29. _sha1 = hashlib.sha1()
  30. if isinstance(val, bytes):
  31. _sha1.update(str(val).encode("utf-8"))
  32. elif isinstance(val, str):
  33. _sha1.update(val.encode("utf-8"))
  34. return _sha1.hexdigest()
  35. def remove(file_path: str):
  36. try:
  37. os.remove(file_path)
  38. except:
  39. pass
  40. def getsize(file):
  41. try:
  42. return os.path.getsize(file)
  43. except FileNotFoundError:
  44. return 0
  45. def discern_file_format(text):
  46. text = text.strip()
  47. file_types = {
  48. *DOCTYPE,
  49. *COMPRESSION_TYPE,
  50. *IMAGE_TYPE,
  51. *OTHER_TYPE
  52. }
  53. for file_type in file_types:
  54. all_file_format = [file_type, file_type.upper()]
  55. for t in all_file_format:
  56. result = re.match(f'.*{t}$', text, re.S)
  57. if result is not None:
  58. return t
  59. else:
  60. unknown_type = re.findall('[^.\\/:*?"<>|\r\n]+$', text, re.S)
  61. logger.warning(f'[附件类型识别]未定义的文件类型{unknown_type}')
  62. return None
  63. def extract_file_type(text):
  64. if text is None:
  65. return None
  66. return discern_file_format(text)
  67. def extract_file_name_by_href(href: str, file_type: str):
  68. """从url中抽取文件名称"""
  69. # 中文标点符号:[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b]
  70. # 中文字符:[\u4e00 -\u9fa5]
  71. zh_char_pattern = '[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\u4e00-\u9fa5]+'
  72. parser = urlparse(href)
  73. query = (parser.query or parser.path)
  74. result = re.search(f'.*\\.{file_type}', query, re.S)
  75. if result is not None:
  76. encode_str = unquote(result.group())
  77. name = re.search(zh_char_pattern, encode_str)
  78. if name is not None:
  79. return unquote(name.group())
  80. return None
  81. def extract_file_name(text):
  82. file_type = discern_file_format(text)
  83. if file_type is not None:
  84. repl = '.{}'.format(file_type)
  85. text = text.replace(repl, '')
  86. return text
  87. def verify_file_name(name):
  88. if extract_file_type(name) is None:
  89. raise ValueError
  90. # 去除附件名空格、两个后缀
  91. def clean_file_name(file_name:str,file_type:str):
  92. file_name = file_name.strip()
  93. if file_type in file_name:
  94. file_name = file_name.replace(f'.{file_type}', '')
  95. return file_name
  96. # 限制附件大小:size < 5 kb 不存入数据库
  97. def limit_file_size(file_size:str):
  98. if "M" in file_size or "m" in file_size:
  99. file_size = float("".join(re.findall('^[0-9]\d*\.\d*|[1-9]\d*',file_size))) * 1000
  100. else:
  101. file_size = "".join(re.findall('^[0-9]\d*\.\d*|[1-9]\d*', file_size))
  102. if float(file_size) < 5:
  103. return False
  104. else:
  105. return True
  106. # 判断附件地址是否正确
  107. def judge_file_url(file_url:str):
  108. file_url = file_url.strip()
  109. if " " in file_url:
  110. file_url = file_url.split(" ")[0]
  111. return file_url
  112. # 需二次请求 添加附件地址参数 appUrlFlag
  113. def add_appUrlFlag_param(file_url):
  114. if "appUrlFlag" in file_url and "downloadztbattach" in file_url and "attachGuid" in file_url:
  115. file_url = file_url.replace('downloadztbattach','ztbAttachDownloadAction.action') + "&cmd=getContent"
  116. return file_url
  117. # 附件下载 需验证码
  118. session = requests.session()
  119. headers = {
  120. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36"
  121. }
  122. # 打码接口
  123. def get_code(file_path: str) -> dict:
  124. upload_address = "http://123.57.163.80:2119/v1/images/verify"
  125. with open(file_path, 'rb') as f:
  126. image_bytes = f.read()
  127. content = {'file': image_bytes}
  128. # json_resp = get_verify_code(upload_address, content)
  129. headers = {'accept': 'application/json'}
  130. response = session.post(upload_address, headers=headers, files=content, stream=True)
  131. return response.json()
  132. # 识别验证码
  133. def get_dealcode(img_url):
  134. res = session.get(img_url, headers=headers)
  135. img_path = 'image'
  136. if not os.path.exists(img_path):
  137. os.mkdir(img_path)
  138. with open(img_path + '/zgzbycgw.jpg', 'wb') as f:
  139. f.write(res.content)
  140. res = get_code(img_path + '/zgzbycgw.jpg')
  141. if res.get("msg") == "success":
  142. img_code = res.get("r").get("code")
  143. else:
  144. img_code = None
  145. return img_code
  146. # 天津市政府采购网
  147. # def tjzfcgw_file_yzm(file_url):
  148. # img_url = 'http://www.ccgp-tianjin.gov.cn/commons/image.jsp'
  149. # session.get(file_url, headers=headers, verify=False)
  150. #
  151. # # 下载地址
  152. # file_url_yzm = "http://www.ccgp-tianjin.gov.cn/portal/documentView.do"
  153. #
  154. # Yzm_result = get_dealcode(img_url).replace("=", "").replace("?", "")
  155. # if "x" in Yzm_result:
  156. # Yzm_result = Yzm_result.replace("x", "*")
  157. # try:
  158. # yzm = eval(Yzm_result)
  159. # except:
  160. # yzm = ""
  161. #
  162. # params_yzm = {
  163. # "imageString": f"{yzm}",
  164. # "method": "downNewFiles"
  165. # }
  166. #
  167. # file_result = session.get(file_url_yzm, headers=headers, params=params_yzm, verify=False)
  168. #
  169. # req_count = 1
  170. # while "请输入验证码" in file_result.text:
  171. # if req_count >= 10:
  172. # break
  173. # Yzm_result = get_dealcode(img_url).replace("=", "").replace("?", "")
  174. # if "x" in Yzm_result:
  175. # Yzm_result = Yzm_result.replace("x", "*")
  176. # try:
  177. # yzm = eval(Yzm_result)
  178. # except:
  179. # yzm = ""
  180. #
  181. # params_yzm = {
  182. # "imageString": f"{yzm}",
  183. # "method": "downNewFiles"
  184. # }
  185. #
  186. # file_result = session.get(file_url_yzm, headers=headers, params=params_yzm, verify=False)
  187. # # 站点限制 访问频率 ,故休眠时间较大
  188. # time.sleep(random.randint(10,20))
  189. # req_count += 1
  190. #
  191. # return file_result.content
  192. # 判断 附件下载 是否需要 验证码
  193. # yzm_keywords = ['method=downEnId']
  194. # 附件下载 需要 验证码 的方法
  195. # site_list_yzm = [tjzfcgw_file_yzm]
  196. # 判断 附件下载 是否需要 修改 请求方式
  197. req_keywords = ['请求类型防御']
  198. # 附件下载 需要 修改附件地址 的方法
  199. modify_file_url_list = [add_appUrlFlag_param]