clean_file.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. import hashlib
  2. import os
  3. import re
  4. from urllib.parse import urlparse, unquote
  5. import requests
  6. from utils.log import logger
  7. # 文件文档类型
  8. DOCTYPE = {
  9. 'txt', 'rtf', 'dps', 'et', 'ett', 'xls',
  10. 'xlsx', 'xlsb', 'xlsm', 'xlt', 'ods', 'pmd', 'pmdx',
  11. 'doc', 'docm', 'docx', 'dot', 'dotm', 'dotx',
  12. 'odt', 'wps', 'csv', 'xml', 'xps'
  13. }
  14. # 压缩类型
  15. COMPRESSION_TYPE = {
  16. 'rar', 'zip', 'gzzb', '7z', 'tar', 'gz', 'bz2', 'jar', 'iso', 'cab',
  17. 'arj', 'lzh', 'ace', 'uue', 'edxz',
  18. }
  19. # 图片类型
  20. IMAGE_TYPE = {
  21. 'jpg', 'png', 'jpeg', 'tiff', 'gif', 'psd', 'raw', 'eps', 'svg', 'bmp',
  22. 'pdf'
  23. }
  24. # 其他类型
  25. OTHER_TYPE = {
  26. 'swf', 'nxzf', 'xezf', 'nxcf'
  27. }
  28. def sha1(val):
  29. _sha1 = hashlib.sha1()
  30. if isinstance(val, bytes):
  31. _sha1.update(str(val).encode("utf-8"))
  32. elif isinstance(val, str):
  33. _sha1.update(val.encode("utf-8"))
  34. return _sha1.hexdigest()
  35. def remove(file_path: str):
  36. os.remove(file_path)
  37. def getsize(file):
  38. try:
  39. return os.path.getsize(file)
  40. except FileNotFoundError:
  41. return 0
  42. def discern_file_format(text):
  43. text = text.strip()
  44. file_types = {
  45. *DOCTYPE,
  46. *COMPRESSION_TYPE,
  47. *IMAGE_TYPE,
  48. *OTHER_TYPE
  49. }
  50. for file_type in file_types:
  51. all_file_format = [file_type, file_type.upper()]
  52. for t in all_file_format:
  53. result = re.match(f'.*{t}$', text, re.S)
  54. if result is not None:
  55. return t
  56. else:
  57. unknown_type = re.findall('[^.\\/:*?"<>|\r\n]+$', text, re.S)
  58. logger.warning(f'[附件类型识别]未定义的文件类型{unknown_type}')
  59. return None
  60. def extract_file_type(text):
  61. if text is None:
  62. return None
  63. return discern_file_format(text)
  64. def extract_file_name_by_href(href: str, file_type: str):
  65. """从url中抽取文件名称"""
  66. # 中文标点符号:[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b]
  67. # 中文字符:[\u4e00 -\u9fa5]
  68. zh_char_pattern = '[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\u4e00-\u9fa5]+'
  69. parser = urlparse(href)
  70. query = (parser.query or parser.path)
  71. result = re.search(f'.*\\.{file_type}', query, re.S)
  72. if result is not None:
  73. encode_str = unquote(result.group())
  74. name = re.search(zh_char_pattern, encode_str)
  75. if name is not None:
  76. return unquote(name.group())
  77. return None
  78. def extract_file_name(text):
  79. file_type = discern_file_format(text)
  80. if file_type is not None:
  81. repl = '.{}'.format(file_type)
  82. text = text.replace(repl, '')
  83. return text
  84. def verify_file_name(name):
  85. if extract_file_type(name) is None:
  86. raise ValueError
  87. # 去除附件名空格、两个后缀
  88. def clean_file_name(file_name:str,file_type:str):
  89. file_name = file_name.strip()
  90. if file_type in file_name:
  91. file_name = file_name.replace(f'.{file_type}', '')
  92. return file_name
  93. # 限制附件大小:size < 5 kb 不存入数据库
  94. def limit_file_size(file_size:str):
  95. if "M" in file_size or "m" in file_size:
  96. file_size = float("".join(re.findall('^[0-9]\d*\.\d*|[1-9]\d*',file_size))) * 1000
  97. else:
  98. file_size = "".join(re.findall('^[0-9]\d*\.\d*|[1-9]\d*', file_size))
  99. if float(file_size) < 5:
  100. return False
  101. else:
  102. return True
  103. # 判断附件地址是否正确
  104. def judge_file_url(file_url:str):
  105. file_url = file_url.strip()
  106. if " " in file_url:
  107. file_url = file_url.split(" ")[0]
  108. return file_url
  109. # 需二次请求 添加附件地址参数 appUrlFlag
  110. def add_appUrlFlag_param(file_url):
  111. if "appUrlFlag" in file_url and "downloadztbattach" in file_url and "attachGuid" in file_url:
  112. file_url = file_url.replace('downloadztbattach','ztbAttachDownloadAction.action') + "&cmd=getContent"
  113. return file_url
  114. # 附件下载 需验证码
  115. session = requests.session()
  116. headers = {
  117. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36"
  118. }
  119. # 打码接口
  120. def get_code(file_path: str) -> dict:
  121. upload_address = "http://pycaptcha.spdata.jianyu360.com/v1/images/verify"
  122. with open(file_path, 'rb') as f:
  123. image_bytes = f.read()
  124. content = {'file': image_bytes}
  125. # json_resp = get_verify_code(upload_address, content)
  126. headers = {'accept': 'application/json'}
  127. response = session.post(upload_address, headers=headers, files=content, stream=True)
  128. return response.json()
  129. # 识别验证码
  130. def get_dealcode(img_url):
  131. res = session.get(img_url, headers=headers)
  132. img_path = 'image'
  133. if not os.path.exists(img_path):
  134. os.mkdir(img_path)
  135. with open(img_path + '/zgzbycgw.jpg', 'wb') as f:
  136. f.write(res.content)
  137. res = get_code(img_path + '/zgzbycgw.jpg')
  138. if res.get("msg") == "success":
  139. img_code = res.get("r").get("code")
  140. else:
  141. img_code = None
  142. return img_code
  143. # 天津市政府采购网
  144. # def tjzfcgw_file_yzm(file_url):
  145. # img_url = 'http://www.ccgp-tianjin.gov.cn/commons/image.jsp'
  146. # session.get(file_url, headers=headers, verify=False)
  147. #
  148. # # 下载地址
  149. # file_url_yzm = "http://www.ccgp-tianjin.gov.cn/portal/documentView.do"
  150. #
  151. # Yzm_result = get_dealcode(img_url).replace("=", "").replace("?", "")
  152. # if "x" in Yzm_result:
  153. # Yzm_result = Yzm_result.replace("x", "*")
  154. # try:
  155. # yzm = eval(Yzm_result)
  156. # except:
  157. # yzm = ""
  158. #
  159. # params_yzm = {
  160. # "imageString": f"{yzm}",
  161. # "method": "downNewFiles"
  162. # }
  163. #
  164. # file_result = session.get(file_url_yzm, headers=headers, params=params_yzm, verify=False)
  165. #
  166. # req_count = 1
  167. # while "请输入验证码" in file_result.text:
  168. # if req_count >= 10:
  169. # break
  170. # Yzm_result = get_dealcode(img_url).replace("=", "").replace("?", "")
  171. # if "x" in Yzm_result:
  172. # Yzm_result = Yzm_result.replace("x", "*")
  173. # try:
  174. # yzm = eval(Yzm_result)
  175. # except:
  176. # yzm = ""
  177. #
  178. # params_yzm = {
  179. # "imageString": f"{yzm}",
  180. # "method": "downNewFiles"
  181. # }
  182. #
  183. # file_result = session.get(file_url_yzm, headers=headers, params=params_yzm, verify=False)
  184. # # 站点限制 访问频率 ,故休眠时间较大
  185. # time.sleep(random.randint(10,20))
  186. # req_count += 1
  187. #
  188. # return file_result.content
  189. # 判断 附件下载 是否需要 验证码
  190. # yzm_keywords = ['method=downEnId']
  191. # 附件下载 需要 验证码 的方法
  192. # site_list_yzm = [tjzfcgw_file_yzm]
  193. # 判断 附件下载 是否需要 修改 请求方式
  194. req_keywords = ['请求类型防御']
  195. # 附件下载 需要 修改附件地址 的方法
  196. modify_file_url_list = [add_appUrlFlag_param]