#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # @Author : lijunliang # @Email : lijunliang@topnet.net.cn # @File : law.py # @Software: PyCharm import os import argparse import datetime from module.parse_file import parse_file_start from module.abstract import make_summary from loguru import logger from module.price import get_pricing from util.oss_file import OssServeClient from module.read_config import read_ini from util.db_helper import DBHelper from module.sql_operate import md5_exists from module.sql_operate import save_field from module.parse_file import get_property from util.file_operations import file_copy from util.file_operations import generate_directory, del_directory from module.ac_sensitive import ACAutomation from pymongo import MongoClient from util.convert2img import convert_img import pandas as pd from module.load_classify import load_classify import re import uuid import random logger.add("./logging/run.log", rotation="12:00") # 日志文件 parser = argparse.ArgumentParser("指定监听端口") parser.add_argument('-dir', '--file_dir', default="./data/file/", type=str, help="目录") parser.add_argument('-config', '--config_path', default="./data/config.ini", type=str, help="配置文件config.ini") parser.add_argument('-class', '--docClass', default=1, type=int, help="类别") parser.add_argument('-tags', '--docTags', default="", type=str, help="标签") parser.add_argument('-pricing_model', '--pricing_type', default="页数", type=str, help="页数or字数") parser.add_argument('-pricing', '--base_price', default=500, type=float, help="初始价钱") # parser.add_argument('-addWater', '--Water', default="0" * 12, type=str, help="用户id") parser.add_argument('-sensitive_file', '--sensitive_path', default="./data/sensitive_words.txt", type=str, help="敏感词文件路径") parser.add_argument('-classify_file', '--classify_path', default="./data/classify.csv", type=str, help="分类文件") args = parser.parse_args() # docType = {'doc': 1, 'docx': 1, 'ppt': 4, 'pptx': 4, 'xls': 3, 'xlsx': 3, 'txt': 5, 'pdf': 2, 'html': 2, # 'htm': 2} # 文件类型字典 docType = {'doc': 1, 'docx': 1, 'ppt': 4, 'pptx': 4, 'txt': 5, 'pdf': 2} ACA = ACAutomation() ACA.parse(args.sensitive_path) FileOss, MySql, previewOss, ClassifyDict = "", "", "", {} targetDir = "data/folder/" # 临时储存目标文件夹(上传结束后会进行删除,不改变数据原的文件) page_pattern = re.compile(r"(-\d+页|\d+页)") # 去除文件名中的页码 with open("./data/replace_words.txt") as f: replace_words = [row.strip() for row in f.read().split("、") if row.strip()] # 文件名称中替换掉的词语 replace_words.sort(key=lambda x: len(x), reverse=True) # 替换词语时,先长后短 print("replace_words-->",replace_words) def create_oss_object(oss_config: dict): """ oss服务初始化函数 :param oss_config: :return: """ return OssServeClient(access_key_id=oss_config["access_key_id"], access_key_secret=oss_config["access_key_secret"], endpoint=oss_config["endpoint"], bucket_name=oss_config["bucket_name"]) def link_db(): """ 连接数据库 :return: """ Config = read_ini(args.config_path) FileConfig = Config["oss_file_config"] MySqlConfig = Config["mysql_config"] previewConfig = Config["previewConfig"] FileOss = create_oss_object(FileConfig) # file文件上传oss previewOss = create_oss_object(previewConfig) # file文件上传oss MySql = DBHelper(MySqlConfig) return FileOss, MySql, previewOss def check_file_type(doctype: str): """ 文件类型检测 :param doctype: :return: """ if doctype not in docType: logger.warning("%s文件类型不匹配---->" % doctype) return False return True @logger.catch def upload_oss(file_path: str, file_content: str, pdf_path: str, cover_path: str, persistent: dict) -> dict: """ 文件上传oss :param file_path: 文件路径 :param file_content: 解析文本 :param persistent: 自定义请求头 :return: """ global FileOss, previewOss succeed = {} source_oss_name = str(uuid.uuid1(int(time.time()))) + "." + file_path.split(".")[-1] pdf_oss_name = str(uuid.uuid1(int(time.time()))) + "." + "pdf" cover_oss_name = str(uuid.uuid1(int(time.time()))) + "." + "png" text_oss_name = str(uuid.uuid1(int(time.time()))) per_header = FileOss.create_oss_meta(persistent) if not per_header: per_header = None # 源文件上传 with open(file_path, "rb") as file: state, request_id = FileOss.upload_bytes_file(source_oss_name, file, headers=per_header) if state: succeed["ossDocId"] = source_oss_name # pdf文件上传 with open(pdf_path, "rb") as pdf: state, request_id = FileOss.upload_bytes_file(pdf_oss_name, pdf, headers=per_header) if state: succeed["ossPdfId"] = pdf_oss_name # 文本文件上传 state, request_id = FileOss.upload_text_file(text_oss_name, file_content, headers=per_header) if state: succeed["ossTxtId"] = text_oss_name # 封面图片上传 with open(cover_path, "rb") as cover: state, request_id = previewOss.upload_bytes_file(cover_oss_name, cover, headers=per_header) if state: succeed["previewImgId"] = cover_oss_name return succeed def get_field(file_path: str, persistent: dict) -> dict: """ 文件获取重要字段 :param file_path: :param persistent:自定义请求头字段 :return: """ field = {} # 解析文件,获得文本文档 file_content, pages, pdf_path = parse_file_start(file_path) text_size = len(file_content) cid_count = file_content.count("cid") # 疑似文档加密 if text_size < 100 or (not pages) or (cid_count > 50): # 检测文本长度检测决定是否上传 return {} # search = ACA.search(file_content) #敏感词检查 # if search: # return field cover_path = convert_img(pdf_path) if not cover_path: return {} field["docPageSize"] = pages # 上传成功的文件,字段在函数内定义 upload_ret = upload_oss(file_path, file_content, pdf_path, cover_path, persistent) if not upload_ret: return {} field.update(upload_ret) # 获得摘要 # try: # summary = make_summary(file_content) # 获得摘要 # except Exception as e: # logger.warning("摘要提取失败-->%s" % file_content) # summary = "" field["docSummary"] = file_content[:500] return field def rand_time(start_time: int = 1619845520, end_time: int = 1622523920): """ 获取随机时间 :param start_time: :param end_time: :return: """ target_time = random.randint(start_time, end_time) return target_time def other_save(paths, filename): """ 上传结束后会进行删除,不改变数据原的文件 :param paths:文件路径 :param filename:文件名称 :return: """ try: global targetDir source_path = os.path.join(paths, filename) # 文件路径 print(source_path) abs_dir = os.path.abspath(".") target_dir = os.path.join(abs_dir, targetDir) if os.path.exists(target_dir): del_directory(target_dir) generate_directory(target_dir) low_name = filename.lower().strip() low_name = low_name.replace("(", "").replace(")", "").replace(" ", "") low_name = low_name[-7:] target_path = os.path.join(target_dir, low_name) state = file_copy(source_path, target_path) if not state: return False, target_path except Exception as e: print(e) return False, "" return True, target_path def create_doc_name(filename: str): """ :param filename: :return: """ global page_pattern, replace_words doc_names = filename.split(".")[:-1] doc_names = ".".join(doc_names) pages = page_pattern.findall(doc_names) for v in pages: doc_names = doc_names.replace(v, "") for v in replace_words: doc_names = doc_names.replace(v, "") if doc_names[0] in ["-"]: doc_names = doc_names[1:] return doc_names @logger.catch def update_start(paths: str, filename: str, classifies): """ 生成开始 :param paths:文件路径 :param filename:文件名称 :param classifies:分类有序 :return: """ global ClassifyDict classify = "/".join(classifies) tags = ",".join(classifies) if classify not in ClassifyDict: return f"不存在当前分类-->{classify}" doc_class = ClassifyDict[classify] state, target_path = other_save(paths, filename) need_field = {} if not state: return f"另存为失败,请检查数据源是否有问题--->{paths}--->{filename}" doctype, suffix, file_size = get_property(target_path) if not check_file_type(doctype): # 类型检查 return f"当前文件不在允许的文件类型当中{doctype}--->{suffix}" doc_names = create_doc_name(filename) print("filename-->",filename,"-------docName--->",doc_names) need_field["docName"] = doc_names need_field["docFileType"] = docType[doctype] # 文件类型 need_field["docFileSuffix"] = suffix # 文件后缀 need_field["docFileSize"] = file_size # 文件大小 state, file_md5 = md5_exists(MySql, target_path) # md5检查去重 need_field["md5"] = file_md5 if state: logger.warning("%s已经存在--------》" % filename) return f"当前文件已经存在--->{filename}" field = get_field(target_path, {}) if not field: logger.warning("{}储存失败--->{}".format(paths, filename)) return "当前文件类型转换上传失败,详情查看日志" need_field.update(field) p_time = rand_time() need_field["uploadDate"] = datetime.datetime.fromtimestamp(p_time) need_field["isDelete"] = 0 need_field["downOrUp"] = 1 if ("ossDocId" not in need_field) or ("ossPdfId" not in need_field): need_field["downOrUp"] = 0 need_field["docTags"] = tags need_field["docClass"] = doc_class # 获取输入类别 need_field["userId"] = "4" * 24 # 获取输入用户ID need_field["appId"] = "10000" pages = need_field["docPageSize"] price = get_pricing(doc_class, pages) need_field["price"] = price if not price: need_field["downOrUp"] = 0 logger.warning(f"价格为0检查分类是否不纯在定价--->{doc_class}-->{pages}") insert_state = save_field(MySql, need_field) # 保存到mysql return insert_state def upload_by_dir(file_dir): """ 文件上传 :param file_dir: :return: """ global FileOss, MySql, previewOss, ClassifyDict, targetDir FileOss, MySql, previewOss = link_db() ClassifyDict = load_classify(args.classify_path) print("start--->") flag ="" for paths, dirs, files in os.walk(file_dir): for filename in files: # classify_dir = paths.replace(file_dir, "") # classifies = [v.strip() for v in classify_dir.split("/") if v.strip()] # classify = "/".join(classifies) #if "摩根-全球-投资策略-2018J.P. 摩根全球ETF手册-2018" in filename: # flag = True # continue #if not flag: # continue classifies = ["行业报告"] ret = update_start(paths, filename, classifies) print("--->",ret) if __name__ == '__main__': filepath = "/home/online_library/classData/研究分析报告" import time start = time.time() upload_by_dir(file_dir=filepath) end = time.time() print(end - start)