#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # @Author : lijunliang # @Email : lijunliang@topnet.net.cn # @File : law.py # @Software: PyCharm import os import argparse import datetime from module.parse_file import parse_file_start from module.price import word_size_pricing from module.abstract import make_summary from loguru import logger from util.oss_file import OssServeClient from module.read_config import read_ini from util.db_helper import DBHelper from module.sql_operate import md5_exists from module.sql_operate import save_field from module.parse_file import get_property from util.file_operations import file_copy from util.file_operations import generate_directory, del_directory from module.ac_sensitive import ACAutomation from pymongo import MongoClient from util.convert2img import convert_img import pandas as pd import uuid logger.add("./logging/run.log", rotation="12:00") # 日志文件 parser = argparse.ArgumentParser("指定监听端口") parser.add_argument('-dir', '--file_dir', default="./data/file/", type=str, help="目录") parser.add_argument('-config', '--config_path', default="./data/config.ini", type=str, help="配置文件config.ini") parser.add_argument('-class', '--docClass', default=1, type=int, help="类别") parser.add_argument('-tags', '--docTags', default="", type=str, help="标签") parser.add_argument('-pricing_model', '--pricing_type', default="页数", type=str, help="页数or字数") parser.add_argument('-pricing', '--base_price', default=500, type=float, help="初始价钱") # parser.add_argument('-addWater', '--Water', default="0" * 12, type=str, help="用户id") parser.add_argument('-sensitive_file', '--sensitive_path', default="./data/sensitive_words.txt", type=str, help="敏感词文件路径") parser.add_argument('-classify_file', '--classify_path', default="./data/classify.csv", type=str, help="分类文件") args = parser.parse_args() docType = {'doc': 1, 'docx': 1, 'ppt': 4, 'pptx': 4, 'xls': 3, 'xlsx': 3, 'txt': 5, 'pdf': 2, 'html': 2, 'htm': 2} # 文件类型字典 ACA = ACAutomation() ACA.parse(args.sensitive_path) def create_oss_object(oss_config: dict): """ oss服务初始化函数 :param oss_config: :return: """ return OssServeClient(access_key_id=oss_config["access_key_id"], access_key_secret=oss_config["access_key_secret"], endpoint=oss_config["endpoint"], bucket_name=oss_config["bucket_name"]) def load_classify(): classify = {} classify_file = pd.read_csv(args.classify_path) classify_label = classify_file.values for ind, row in enumerate(classify_label): if len(row) < 2: logger.error("分类文件加载异常--->%d行" % (ind + 1)) continue classify[row[0].strip()] = row[1].strip() return classify def link_db(): ''' 连接数据库 :return: ''' Config = read_ini(args.config_path) FileConfig = Config["oss_file_config"] MySqlConfig = Config["mysql_config"] previewConfig = Config["previewConfig"] FileOss = create_oss_object(FileConfig) # file文件上传oss previewOss = create_oss_object(previewConfig) # file文件上传oss MySql = DBHelper(MySqlConfig) return FileOss, MySql, previewOss FileOss, MySql, previewOss = link_db() def check_file_type(doctype: str) -> bool: """ 文件类型检测 :param doctype: :return: """ if doctype not in docType: logger.warning("%s文件类型不匹配---->" % doctype) return False return True @logger.catch def upload_oss(file_path: str, file_content: str, pdf_path: str, cover_path: str, persistent: dict) -> dict: """ 文件上传oss :param file_path: 文件路径 :param file_content: 解析文本 :param persistent: 自定义请求头 :return: """ global FileOss, previewOss succeed = {} source_oss_name = str(uuid.uuid1(int(time.time()))) + "." + file_path.split(".")[-1] pdf_oss_name = str(uuid.uuid1(int(time.time()))) + "." + "pdf" cover_oss_name = str(uuid.uuid1(int(time.time()))) + "." + "png" text_oss_name = str(uuid.uuid1(int(time.time()))) per_header = FileOss.create_oss_meta(persistent) if not per_header: per_header = None # 源文件上传 with open(file_path, "rb") as file: state, request_id = FileOss.upload_bytes_file(source_oss_name, file, headers=per_header) if state: succeed["ossDocId"] = source_oss_name # pdf文件上传 with open(pdf_path, "rb") as pdf: state, request_id = FileOss.upload_bytes_file(pdf_oss_name, pdf, headers=per_header) if state: succeed["ossPdfId"] = pdf_oss_name # 文本文件上传 state, request_id = FileOss.upload_text_file(text_oss_name, file_content, headers=per_header) if state: succeed["ossTxtId"] = text_oss_name # 封面图片上传 with open(cover_path, "rb") as cover: state, request_id = previewOss.upload_bytes_file(cover_oss_name, cover, headers=per_header) if state: succeed["previewImgId"] = cover_oss_name return succeed def get_field(file_path: str, persistent: dict): ''' 文件获取重要字段 :param file_path: :param file_md5: :param persistent:自定义请求头字段 :return: ''' field = {} # 解析文件,获得文本文档 file_content, pages, pdf_path = parse_file_start(file_path) text_size = len(file_content) if text_size < 100: # 检测文本长度检测决定是否上传 return {} # search = ACA.search(file_content) #敏感词检查 # if search: # return field cover_path = convert_img(pdf_path) if not cover_path: return {} field["docPageSize"] = pages # 上传成功的文件,字段在函数内定义 upload_ret = upload_oss(file_path, file_content, pdf_path, cover_path, persistent) if not upload_ret: return {} field.update(upload_ret) # 获得摘要 # try: # summary = make_summary(file_content) # 获得摘要 # except Exception as e: # logger.warning("摘要提取失败-->%s" % file_content) # summary = "" field["docSummary"] = file_content[:500] # 获得价钱 price = word_size_pricing(args.pricing_type, args.base_price, text_size, 0.03) field["price"] = price return field def other_save(paths, filename): ''' :param paths: :param filename: :return: ''' try: source_path = os.path.join(paths, filename) # 文件路径 abs_dir = os.path.abspath(".") target_dir = os.path.join(abs_dir, "data/folder/") if os.path.exists(target_dir): del_directory(target_dir) generate_directory(target_dir) target_path = os.path.join(target_dir, filename) state = file_copy(source_path, target_path) if not state: return False, target_path except Exception as e: print(e) return False, "" return True, target_path @logger.catch def walk_dir_start(file_dir: str): """ 生成开始 :param file_dir: :return: """ FileOss, MySql, previewOss = link_db() classify_dict = load_classify() for paths, dirs, files in os.walk(file_dir): for filename in files: need_field = {} state, target_path = other_save(paths, filename) if not state: continue doctype, suffix, fileSize = get_property(target_path) if not check_file_type(doctype): # 类型检查 continue docNames = filename.split(".")[:-1] docName = "".join(docNames) docName = docName.replace("x", "").replace("某", "") need_field["docName"] = docName need_field["docFileType"] = docType[doctype] # 文件类型 need_field["docFileSuffix"] = suffix # 文件后缀 need_field["docFileSize"] = fileSize # 文件大小 state, file_md5 = md5_exists(MySql, target_path) # md5检查去重 need_field["md5"] = file_md5 if state: logger.warning("%s已经存在--------》" % filename) continue field = get_field(target_path, {}) if not field: logger.warning("{}储存失败--->{}".format(paths, filename)) continue need_field.update(field) need_field["uploadDate"] = datetime.datetime.now() need_field["isDelete"] = 0 need_field["downOrUp"] = 1 doctype = args.docTags if doctype in classify_dict: docClass = classify_dict[doctype] else: continue need_field["docTags"] = "法律法规," + doctype # 获取输入标签 need_field["docClass"] = docClass # 获取输入类别 need_field["userId"] = "0" * 24 # 获取输入用户ID need_field["appId"] = "auto" save_field(MySql, need_field) # 保存到mysql if __name__ == '__main__': filepath = "./files" import time start = time.time() walk_dir_start() end = time.time() print(end - start)