|
@@ -1,243 +0,0 @@
|
|
|
-import hashlib
|
|
|
-import os
|
|
|
-import re
|
|
|
-from urllib.parse import urlparse, unquote
|
|
|
-
|
|
|
-import requests
|
|
|
-
|
|
|
-from utils.log import logger
|
|
|
-
|
|
|
-# 文件文档类型
|
|
|
-DOCTYPE = {
|
|
|
- 'txt', 'rtf', 'dps', 'et', 'ett', 'xls',
|
|
|
- 'xlsx', 'xlsb', 'xlsm', 'xlt', 'ods', 'pmd', 'pmdx',
|
|
|
- 'doc', 'docm', 'docx', 'dot', 'dotm', 'dotx',
|
|
|
- 'odt', 'wps', 'csv', 'xml', 'xps'
|
|
|
-}
|
|
|
-# 压缩类型
|
|
|
-COMPRESSION_TYPE = {
|
|
|
- 'rar', 'zip', 'gzzb', '7z', 'tar', 'gz', 'bz2', 'jar', 'iso', 'cab',
|
|
|
- 'arj', 'lzh', 'ace', 'uue', 'edxz',
|
|
|
-}
|
|
|
-# 图片类型
|
|
|
-IMAGE_TYPE = {
|
|
|
- 'jpg', 'png', 'jpeg', 'tiff', 'gif', 'psd', 'raw', 'eps', 'svg', 'bmp',
|
|
|
- 'pdf'
|
|
|
-}
|
|
|
-# 其他类型
|
|
|
-OTHER_TYPE = {
|
|
|
- 'swf', 'nxzf', 'xezf', 'nxcf'
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
-def sha1(val):
|
|
|
- _sha1 = hashlib.sha1()
|
|
|
- if isinstance(val, bytes):
|
|
|
- _sha1.update(str(val).encode("utf-8"))
|
|
|
- elif isinstance(val, str):
|
|
|
- _sha1.update(val.encode("utf-8"))
|
|
|
- return _sha1.hexdigest()
|
|
|
-
|
|
|
-
|
|
|
-def remove(file_path: str):
|
|
|
- os.remove(file_path)
|
|
|
-
|
|
|
-
|
|
|
-def getsize(file):
|
|
|
- try:
|
|
|
- return os.path.getsize(file)
|
|
|
- except FileNotFoundError:
|
|
|
- return 0
|
|
|
-
|
|
|
-
|
|
|
-def discern_file_format(text):
|
|
|
-
|
|
|
- text = text.strip()
|
|
|
-
|
|
|
- file_types = {
|
|
|
- *DOCTYPE,
|
|
|
- *COMPRESSION_TYPE,
|
|
|
- *IMAGE_TYPE,
|
|
|
- *OTHER_TYPE
|
|
|
- }
|
|
|
- for file_type in file_types:
|
|
|
- all_file_format = [file_type, file_type.upper()]
|
|
|
- for t in all_file_format:
|
|
|
- result = re.match(f'.*{t}$', text, re.S)
|
|
|
- if result is not None:
|
|
|
- return t
|
|
|
- else:
|
|
|
- unknown_type = re.findall('[^.\\/:*?"<>|\r\n]+$', text, re.S)
|
|
|
- logger.warning(f'[附件类型识别]未定义的文件类型{unknown_type}')
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
-def extract_file_type(text):
|
|
|
- if text is None:
|
|
|
- return None
|
|
|
- return discern_file_format(text)
|
|
|
-
|
|
|
-
|
|
|
-def extract_file_name_by_href(href: str, file_type: str):
|
|
|
- """从url中抽取文件名称"""
|
|
|
- # 中文标点符号:[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b]
|
|
|
- # 中文字符:[\u4e00 -\u9fa5]
|
|
|
- zh_char_pattern = '[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\u4e00-\u9fa5]+'
|
|
|
- parser = urlparse(href)
|
|
|
- query = (parser.query or parser.path)
|
|
|
- result = re.search(f'.*\\.{file_type}', query, re.S)
|
|
|
- if result is not None:
|
|
|
- encode_str = unquote(result.group())
|
|
|
- name = re.search(zh_char_pattern, encode_str)
|
|
|
- if name is not None:
|
|
|
- return unquote(name.group())
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
-def extract_file_name(text):
|
|
|
- file_type = discern_file_format(text)
|
|
|
- if file_type is not None:
|
|
|
- repl = '.{}'.format(file_type)
|
|
|
- text = text.replace(repl, '')
|
|
|
- return text
|
|
|
-
|
|
|
-
|
|
|
-def verify_file_name(name):
|
|
|
- if extract_file_type(name) is None:
|
|
|
- raise ValueError
|
|
|
-
|
|
|
-
|
|
|
-# 去除附件名空格、两个后缀
|
|
|
-def clean_file_name(file_name:str,file_type:str):
|
|
|
- file_name = file_name.strip()
|
|
|
- if file_type in file_name:
|
|
|
- file_name = file_name.replace(f'.{file_type}', '')
|
|
|
- return file_name
|
|
|
-
|
|
|
-
|
|
|
-# 限制附件大小:size < 5 kb 不存入数据库
|
|
|
-def limit_file_size(file_size:str):
|
|
|
- if "M" in file_size or "m" in file_size:
|
|
|
- file_size = float("".join(re.findall('^[0-9]\d*\.\d*|[1-9]\d*',file_size))) * 1000
|
|
|
- else:
|
|
|
- file_size = "".join(re.findall('^[0-9]\d*\.\d*|[1-9]\d*', file_size))
|
|
|
- if float(file_size) < 5:
|
|
|
- return False
|
|
|
- else:
|
|
|
- return True
|
|
|
-
|
|
|
-
|
|
|
-# 判断附件地址是否正确
|
|
|
-def judge_file_url(file_url:str):
|
|
|
- file_url = file_url.strip()
|
|
|
- if " " in file_url:
|
|
|
- file_url = file_url.split(" ")[0]
|
|
|
- return file_url
|
|
|
-
|
|
|
-
|
|
|
-# 需二次请求 添加附件地址参数 appUrlFlag
|
|
|
-def add_appUrlFlag_param(file_url):
|
|
|
- if "appUrlFlag" in file_url and "downloadztbattach" in file_url and "attachGuid" in file_url:
|
|
|
- file_url = file_url.replace('downloadztbattach','ztbAttachDownloadAction.action') + "&cmd=getContent"
|
|
|
- return file_url
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-# 附件下载 需验证码
|
|
|
-session = requests.session()
|
|
|
-
|
|
|
-headers = {
|
|
|
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36"
|
|
|
-}
|
|
|
-
|
|
|
-# 打码接口
|
|
|
-def get_code(file_path: str) -> dict:
|
|
|
- upload_address = "http://pycaptcha.spdata.jianyu360.com/v1/images/verify"
|
|
|
- with open(file_path, 'rb') as f:
|
|
|
- image_bytes = f.read()
|
|
|
- content = {'file': image_bytes}
|
|
|
- # json_resp = get_verify_code(upload_address, content)
|
|
|
- headers = {'accept': 'application/json'}
|
|
|
- response = session.post(upload_address, headers=headers, files=content, stream=True)
|
|
|
- return response.json()
|
|
|
-
|
|
|
-
|
|
|
-# 识别验证码
|
|
|
-def get_dealcode(img_url):
|
|
|
- res = session.get(img_url, headers=headers)
|
|
|
- img_path = 'image'
|
|
|
- if not os.path.exists(img_path):
|
|
|
- os.mkdir(img_path)
|
|
|
- with open(img_path + '/zgzbycgw.jpg', 'wb') as f:
|
|
|
- f.write(res.content)
|
|
|
- res = get_code(img_path + '/zgzbycgw.jpg')
|
|
|
- if res.get("msg") == "success":
|
|
|
- img_code = res.get("r").get("code")
|
|
|
- else:
|
|
|
- img_code = None
|
|
|
- return img_code
|
|
|
-
|
|
|
-
|
|
|
-# 天津市政府采购网
|
|
|
-# def tjzfcgw_file_yzm(file_url):
|
|
|
-# img_url = 'http://www.ccgp-tianjin.gov.cn/commons/image.jsp'
|
|
|
-# session.get(file_url, headers=headers, verify=False)
|
|
|
-#
|
|
|
-# # 下载地址
|
|
|
-# file_url_yzm = "http://www.ccgp-tianjin.gov.cn/portal/documentView.do"
|
|
|
-#
|
|
|
-# Yzm_result = get_dealcode(img_url).replace("=", "").replace("?", "")
|
|
|
-# if "x" in Yzm_result:
|
|
|
-# Yzm_result = Yzm_result.replace("x", "*")
|
|
|
-# try:
|
|
|
-# yzm = eval(Yzm_result)
|
|
|
-# except:
|
|
|
-# yzm = ""
|
|
|
-#
|
|
|
-# params_yzm = {
|
|
|
-# "imageString": f"{yzm}",
|
|
|
-# "method": "downNewFiles"
|
|
|
-# }
|
|
|
-#
|
|
|
-# file_result = session.get(file_url_yzm, headers=headers, params=params_yzm, verify=False)
|
|
|
-#
|
|
|
-# req_count = 1
|
|
|
-# while "请输入验证码" in file_result.text:
|
|
|
-# if req_count >= 10:
|
|
|
-# break
|
|
|
-# Yzm_result = get_dealcode(img_url).replace("=", "").replace("?", "")
|
|
|
-# if "x" in Yzm_result:
|
|
|
-# Yzm_result = Yzm_result.replace("x", "*")
|
|
|
-# try:
|
|
|
-# yzm = eval(Yzm_result)
|
|
|
-# except:
|
|
|
-# yzm = ""
|
|
|
-#
|
|
|
-# params_yzm = {
|
|
|
-# "imageString": f"{yzm}",
|
|
|
-# "method": "downNewFiles"
|
|
|
-# }
|
|
|
-#
|
|
|
-# file_result = session.get(file_url_yzm, headers=headers, params=params_yzm, verify=False)
|
|
|
-# # 站点限制 访问频率 ,故休眠时间较大
|
|
|
-# time.sleep(random.randint(10,20))
|
|
|
-# req_count += 1
|
|
|
-#
|
|
|
-# return file_result.content
|
|
|
-
|
|
|
-
|
|
|
-# 判断 附件下载 是否需要 验证码
|
|
|
-# yzm_keywords = ['method=downEnId']
|
|
|
-
|
|
|
-# 附件下载 需要 验证码 的方法
|
|
|
-# site_list_yzm = [tjzfcgw_file_yzm]
|
|
|
-
|
|
|
-# 判断 附件下载 是否需要 修改 请求方式
|
|
|
-
|
|
|
-
|
|
|
-req_keywords = ['请求类型防御']
|
|
|
-
|
|
|
-# 附件下载 需要 修改附件地址 的方法
|
|
|
-modify_file_url_list = [add_appUrlFlag_param]
|
|
|
-
|
|
|
-
|