publish_file.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. #!/usr/bin/python3.6
  2. # -*- coding: utf-8 -*-
  3. # @Author : lijunliang
  4. # @Email : lijunliang@topnet.net.cn
  5. # @File : law.py
  6. # @Software: PyCharm
  7. import os
  8. import argparse
  9. import datetime
  10. from module.parse_file import parse_file_start
  11. from module.abstract import make_summary
  12. from loguru import logger
  13. from module.price import get_pricing
  14. from util.oss_file import OssServeClient
  15. from module.read_config import read_ini
  16. from util.db_helper import DBHelper
  17. from module.sql_operate import md5_exists
  18. from module.sql_operate import save_field
  19. from module.parse_file import get_property
  20. from util.file_operations import file_copy
  21. from util.file_operations import generate_directory, del_directory
  22. from module.ac_sensitive import ACAutomation
  23. from pymongo import MongoClient
  24. from util.convert2img import convert_img
  25. import pandas as pd
  26. from module.load_classify import load_classify
  27. import re
  28. import uuid
  29. import random
  30. logger.add("./logging/run.log", rotation="12:00") # 日志文件
  31. parser = argparse.ArgumentParser("指定监听端口")
  32. parser.add_argument('-dir', '--file_dir', default="./data/file/", type=str, help="目录")
  33. parser.add_argument('-config', '--config_path', default="./data/config.ini", type=str, help="配置文件config.ini")
  34. parser.add_argument('-class', '--docClass', default=1, type=int, help="类别")
  35. parser.add_argument('-tags', '--docTags', default="", type=str, help="标签")
  36. parser.add_argument('-pricing_model', '--pricing_type', default="页数", type=str, help="页数or字数")
  37. parser.add_argument('-pricing', '--base_price', default=500, type=float, help="初始价钱")
  38. # parser.add_argument('-addWater', '--Water', default="0" * 12, type=str, help="用户id")
  39. parser.add_argument('-sensitive_file', '--sensitive_path', default="./data/sensitive_words.txt", type=str,
  40. help="敏感词文件路径")
  41. parser.add_argument('-classify_file', '--classify_path', default="./data/classify.csv", type=str,
  42. help="分类文件")
  43. args = parser.parse_args()
  44. # docType = {'doc': 1, 'docx': 1, 'ppt': 4, 'pptx': 4, 'xls': 3, 'xlsx': 3, 'txt': 5, 'pdf': 2, 'html': 2,
  45. # 'htm': 2} # 文件类型字典
  46. docType = {'doc': 1, 'docx': 1, 'ppt': 4, 'pptx': 4, 'txt': 5, 'pdf': 2}
  47. ACA = ACAutomation()
  48. ACA.parse(args.sensitive_path)
  49. FileOss, MySql, previewOss, ClassifyDict = "", "", "", {}
  50. targetDir = "data/folder/" # 临时储存目标文件夹(上传结束后会进行删除,不改变数据原的文件)
  51. page_pattern = re.compile(r"(-\d+页|\d+页)") # 去除文件名中的页码
  52. with open("./data/replace_words.txt") as f:
  53. replace_words = [row.strip() for row in f.read().split("、") if row.strip()] # 文件名称中替换掉的词语
  54. replace_words.sort(key=lambda x: len(x), reverse=True) # 替换词语时,先长后短
  55. print("replace_words-->",replace_words)
  56. def create_oss_object(oss_config: dict):
  57. """
  58. oss服务初始化函数
  59. :param oss_config:
  60. :return:
  61. """
  62. return OssServeClient(access_key_id=oss_config["access_key_id"],
  63. access_key_secret=oss_config["access_key_secret"],
  64. endpoint=oss_config["endpoint"],
  65. bucket_name=oss_config["bucket_name"])
  66. def link_db():
  67. """
  68. 连接数据库
  69. :return:
  70. """
  71. Config = read_ini(args.config_path)
  72. FileConfig = Config["oss_file_config"]
  73. MySqlConfig = Config["mysql_config"]
  74. previewConfig = Config["previewConfig"]
  75. FileOss = create_oss_object(FileConfig) # file文件上传oss
  76. previewOss = create_oss_object(previewConfig) # file文件上传oss
  77. MySql = DBHelper(MySqlConfig)
  78. return FileOss, MySql, previewOss
  79. def check_file_type(doctype: str):
  80. """
  81. 文件类型检测
  82. :param doctype:
  83. :return:
  84. """
  85. if doctype not in docType:
  86. logger.warning("%s文件类型不匹配---->" % doctype)
  87. return False
  88. return True
  89. @logger.catch
  90. def upload_oss(file_path: str, file_content: str, pdf_path: str, cover_path: str, persistent: dict) -> dict:
  91. """
  92. 文件上传oss
  93. :param file_path: 文件路径
  94. :param file_content: 解析文本
  95. :param persistent: 自定义请求头
  96. :return:
  97. """
  98. global FileOss, previewOss
  99. succeed = {}
  100. source_oss_name = str(uuid.uuid1(int(time.time()))) + "." + file_path.split(".")[-1]
  101. pdf_oss_name = str(uuid.uuid1(int(time.time()))) + "." + "pdf"
  102. cover_oss_name = str(uuid.uuid1(int(time.time()))) + "." + "png"
  103. text_oss_name = str(uuid.uuid1(int(time.time())))
  104. per_header = FileOss.create_oss_meta(persistent)
  105. if not per_header:
  106. per_header = None
  107. # 源文件上传
  108. with open(file_path, "rb") as file:
  109. state, request_id = FileOss.upload_bytes_file(source_oss_name, file, headers=per_header)
  110. if state:
  111. succeed["ossDocId"] = source_oss_name
  112. # pdf文件上传
  113. with open(pdf_path, "rb") as pdf:
  114. state, request_id = FileOss.upload_bytes_file(pdf_oss_name, pdf, headers=per_header)
  115. if state:
  116. succeed["ossPdfId"] = pdf_oss_name
  117. # 文本文件上传
  118. state, request_id = FileOss.upload_text_file(text_oss_name, file_content, headers=per_header)
  119. if state:
  120. succeed["ossTxtId"] = text_oss_name
  121. # 封面图片上传
  122. with open(cover_path, "rb") as cover:
  123. state, request_id = previewOss.upload_bytes_file(cover_oss_name, cover, headers=per_header)
  124. if state:
  125. succeed["previewImgId"] = cover_oss_name
  126. return succeed
  127. def get_field(file_path: str, persistent: dict) -> dict:
  128. """
  129. 文件获取重要字段
  130. :param file_path:
  131. :param persistent:自定义请求头字段
  132. :return:
  133. """
  134. field = {}
  135. # 解析文件,获得文本文档
  136. file_content, pages, pdf_path = parse_file_start(file_path)
  137. text_size = len(file_content)
  138. cid_count = file_content.count("cid") # 疑似文档加密
  139. if text_size < 100 or (not pages) or (cid_count > 50): # 检测文本长度检测决定是否上传
  140. return {}
  141. # search = ACA.search(file_content) #敏感词检查
  142. # if search:
  143. # return field
  144. cover_path = convert_img(pdf_path)
  145. if not cover_path:
  146. return {}
  147. field["docPageSize"] = pages
  148. # 上传成功的文件,字段在函数内定义
  149. upload_ret = upload_oss(file_path, file_content, pdf_path, cover_path, persistent)
  150. if not upload_ret:
  151. return {}
  152. field.update(upload_ret)
  153. # 获得摘要
  154. # try:
  155. # summary = make_summary(file_content) # 获得摘要
  156. # except Exception as e:
  157. # logger.warning("摘要提取失败-->%s" % file_content)
  158. # summary = ""
  159. field["docSummary"] = file_content[:500]
  160. return field
  161. def rand_time(start_time: int = 1619845520, end_time: int = 1622523920):
  162. """
  163. 获取随机时间
  164. :param start_time:
  165. :param end_time:
  166. :return:
  167. """
  168. target_time = random.randint(start_time, end_time)
  169. return target_time
  170. def other_save(paths, filename):
  171. """
  172. 上传结束后会进行删除,不改变数据原的文件
  173. :param paths:文件路径
  174. :param filename:文件名称
  175. :return:
  176. """
  177. try:
  178. global targetDir
  179. source_path = os.path.join(paths, filename) # 文件路径
  180. print(source_path)
  181. abs_dir = os.path.abspath(".")
  182. target_dir = os.path.join(abs_dir, targetDir)
  183. if os.path.exists(target_dir):
  184. del_directory(target_dir)
  185. generate_directory(target_dir)
  186. low_name = filename.lower().strip()
  187. low_name = low_name.replace("(", "").replace(")", "").replace(" ", "")
  188. low_name = low_name[-7:]
  189. target_path = os.path.join(target_dir, low_name)
  190. state = file_copy(source_path, target_path)
  191. if not state:
  192. return False, target_path
  193. except Exception as e:
  194. print(e)
  195. return False, ""
  196. return True, target_path
  197. def create_doc_name(filename: str):
  198. """
  199. :param filename:
  200. :return:
  201. """
  202. global page_pattern, replace_words
  203. doc_names = filename.split(".")[:-1]
  204. doc_names = ".".join(doc_names)
  205. pages = page_pattern.findall(doc_names)
  206. for v in pages:
  207. doc_names = doc_names.replace(v, "")
  208. for v in replace_words:
  209. doc_names = doc_names.replace(v, "")
  210. if doc_names[0] in ["-"]:
  211. doc_names = doc_names[1:]
  212. return doc_names
  213. @logger.catch
  214. def update_start(paths: str, filename: str, classifies):
  215. """
  216. 生成开始
  217. :param paths:文件路径
  218. :param filename:文件名称
  219. :param classifies:分类有序
  220. :return:
  221. """
  222. global ClassifyDict
  223. classify = "/".join(classifies)
  224. tags = ",".join(classifies)
  225. if classify not in ClassifyDict:
  226. return f"不存在当前分类-->{classify}"
  227. doc_class = ClassifyDict[classify]
  228. state, target_path = other_save(paths, filename)
  229. need_field = {}
  230. if not state:
  231. return f"另存为失败,请检查数据源是否有问题--->{paths}--->{filename}"
  232. doctype, suffix, file_size = get_property(target_path)
  233. if not check_file_type(doctype): # 类型检查
  234. return f"当前文件不在允许的文件类型当中{doctype}--->{suffix}"
  235. doc_names = create_doc_name(filename)
  236. print("filename-->",filename,"-------docName--->",doc_names)
  237. need_field["docName"] = doc_names
  238. need_field["docFileType"] = docType[doctype] # 文件类型
  239. need_field["docFileSuffix"] = suffix # 文件后缀
  240. need_field["docFileSize"] = file_size # 文件大小
  241. state, file_md5 = md5_exists(MySql, target_path) # md5检查去重
  242. need_field["md5"] = file_md5
  243. if state:
  244. logger.warning("%s已经存在--------》" % filename)
  245. return f"当前文件已经存在--->{filename}"
  246. field = get_field(target_path, {})
  247. if not field:
  248. logger.warning("{}储存失败--->{}".format(paths, filename))
  249. return "当前文件类型转换上传失败,详情查看日志"
  250. need_field.update(field)
  251. p_time = rand_time()
  252. need_field["uploadDate"] = datetime.datetime.fromtimestamp(p_time)
  253. need_field["isDelete"] = 0
  254. need_field["downOrUp"] = 1
  255. if ("ossDocId" not in need_field) or ("ossPdfId" not in need_field):
  256. need_field["downOrUp"] = 0
  257. need_field["docTags"] = tags
  258. need_field["docClass"] = doc_class # 获取输入类别
  259. need_field["userId"] = "4" * 24 # 获取输入用户ID
  260. need_field["appId"] = "10000"
  261. pages = need_field["docPageSize"]
  262. price = get_pricing(doc_class, pages)
  263. need_field["price"] = price
  264. if not price:
  265. need_field["downOrUp"] = 0
  266. logger.warning(f"价格为0检查分类是否不纯在定价--->{doc_class}-->{pages}")
  267. insert_state = save_field(MySql, need_field) # 保存到mysql
  268. return insert_state
  269. def upload_by_dir(file_dir):
  270. """
  271. 文件上传
  272. :param file_dir:
  273. :return:
  274. """
  275. global FileOss, MySql, previewOss, ClassifyDict, targetDir
  276. FileOss, MySql, previewOss = link_db()
  277. ClassifyDict = load_classify(args.classify_path)
  278. print("start--->")
  279. flag =""
  280. for paths, dirs, files in os.walk(file_dir):
  281. for filename in files:
  282. # classify_dir = paths.replace(file_dir, "")
  283. # classifies = [v.strip() for v in classify_dir.split("/") if v.strip()]
  284. # classify = "/".join(classifies)
  285. #if "摩根-全球-投资策略-2018J.P. 摩根全球ETF手册-2018" in filename:
  286. # flag = True
  287. # continue
  288. #if not flag:
  289. # continue
  290. classifies = ["行业报告"]
  291. ret = update_start(paths, filename, classifies)
  292. print("--->",ret)
  293. if __name__ == '__main__':
  294. filepath = "/home/online_library/classData/研究分析报告"
  295. import time
  296. start = time.time()
  297. upload_by_dir(file_dir=filepath)
  298. end = time.time()
  299. print(end - start)