#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # @Author : lijunliang # @Email : lijunliang@topnet.net.cn # @File : law.py # @Software: PyCharm import os import argparse import datetime from module.parse_file import parse_file_start from module.abstract import make_summary from loguru import logger from util.oss_file import OssServeClient from module.read_config import read_ini from util.db_helper import DBHelper from module.sql_operate import md5_exists from module.sql_operate import save_field from module.parse_file import get_property from util.file_operations import file_copy from util.file_operations import generate_directory, del_directory from module.ac_sensitive import ACAutomation from pymongo import MongoClient from util.convert2img import convert_img from module.load_classify import load_classify import pandas as pd import uuid logger.add("./logging/run.log", rotation="12:00") # 日志文件 parser = argparse.ArgumentParser("指定监听端口") parser.add_argument('-dir', '--file_dir', default="./data/file/", type=str, help="目录") parser.add_argument('-config', '--config_path', default="./data/config.ini", type=str, help="配置文件config.ini") parser.add_argument('-class', '--docClass', default=1, type=int, help="类别") parser.add_argument('-tags', '--docTags', default="", type=str, help="标签") parser.add_argument('-pricing_model', '--pricing_type', default="页数", type=str, help="页数or字数") parser.add_argument('-pricing', '--base_price', default=500, type=float, help="初始价钱") # parser.add_argument('-addWater', '--Water', default="0" * 12, type=str, help="用户id") parser.add_argument('-sensitive_file', '--sensitive_path', default="./data/sensitive_words.txt", type=str, help="敏感词文件路径") parser.add_argument('-classify_file', '--classify_path', default="./data/classify.csv", type=str, help="分类文件") args = parser.parse_args() docType = {'doc': 1, 'docx': 1, 'ppt': 4, 'pptx': 4, 'xls': 3, 'xlsx': 3, 'txt': 5, 'pdf': 2, 'html': 2, 'htm': 2} # 文件类型字典 ACA = ACAutomation() ACA.parse(args.sensitive_path) def create_oss_object(oss_config: dict): """ oss服务初始化函数 :param oss_config: :return: """ return OssServeClient(access_key_id=oss_config["access_key_id"], access_key_secret=oss_config["access_key_secret"], endpoint=oss_config["endpoint"], bucket_name=oss_config["bucket_name"]) def link_db(): ''' 连接数据库 :return: ''' Config = read_ini(args.config_path) FileConfig = Config["oss_file_config"] MySqlConfig = Config["mysql_config"] previewConfig = Config["previewConfig"] FileOss = create_oss_object(FileConfig) # file文件上传oss previewOss = create_oss_object(previewConfig) # file文件上传oss MySql = DBHelper(MySqlConfig) return FileOss, MySql, previewOss FileOss, MySql, previewOss = link_db() def check_file_type(doctype: str) -> bool: """ 文件类型检测 :param doctype: :return: """ if doctype not in docType: logger.warning("%s文件类型不匹配---->" % doctype) return False return True @logger.catch def upload_oss(file_path: str, file_content: str, pdf_path: str, cover_path: str, persistent: dict) -> dict: """ 文件上传oss :param file_path: 文件路径 :param file_content: 解析文本 :param persistent: 自定义请求头 :return: """ global FileOss, previewOss succeed = {} source_oss_name = str(uuid.uuid1(int(time.time()))) + "." + file_path.split(".")[-1] pdf_oss_name = str(uuid.uuid1(int(time.time()))) + "." + "pdf" cover_oss_name = str(uuid.uuid1(int(time.time()))) + "." + "png" text_oss_name = str(uuid.uuid1(int(time.time()))) per_header = FileOss.create_oss_meta(persistent) if not per_header: per_header = None # 源文件上传 with open(file_path, "rb") as file: state, request_id = FileOss.upload_bytes_file(source_oss_name, file, headers=per_header) if state: succeed["ossDocId"] = source_oss_name # pdf文件上传 with open(pdf_path, "rb") as pdf: state, request_id = FileOss.upload_bytes_file(pdf_oss_name, pdf, headers=per_header) if state: succeed["ossPdfId"] = pdf_oss_name # 文本文件上传 state, request_id = FileOss.upload_text_file(text_oss_name, file_content, headers=per_header) if state: succeed["ossTxtId"] = text_oss_name # 封面图片上传 with open(cover_path, "rb") as cover: state, request_id = previewOss.upload_bytes_file(cover_oss_name, cover, headers=per_header) if state: succeed["previewImgId"] = cover_oss_name return succeed def get_field(file_path: str, persistent: dict): ''' 文件获取重要字段 :param file_path: :param file_md5: :param persistent:自定义请求头字段 :return: ''' field = {} # 解析文件,获得文本文档 file_content, pages, pdf_path = parse_file_start(file_path) text_size = len(file_content) if text_size < 100: # 检测文本长度检测决定是否上传 return {} # search = ACA.search(file_content) #敏感词检查 # if search: # return field cover_path = convert_img(pdf_path) if not cover_path: return {} field["docPageSize"] = pages # 上传成功的文件,字段在函数内定义 upload_ret = upload_oss(file_path, file_content, pdf_path, cover_path, persistent) if not upload_ret: return {} field.update(upload_ret) # 获得摘要 # try: # summary = make_summary(file_content) # 获得摘要 # except Exception as e: # logger.warning("摘要提取失败-->%s" % file_content) # summary = "" field["docSummary"] = file_content[:500] # 获得价钱 return field def other_save(paths, filename): ''' :param paths: :param filename: :return: ''' try: source_path = os.path.join(paths, filename) # 文件路径 abs_dir = os.path.abspath(".") target_dir = os.path.join(abs_dir, "data/folder/") if os.path.exists(target_dir): del_directory(target_dir) generate_directory(target_dir) target_path = os.path.join(target_dir, filename) state = file_copy(source_path, target_path) if not state: return False, target_path except Exception as e: print(e) return False, "" return True, target_path def get_persistent(row: dict): ''' 自定义请求信息字典 :param row: :return: ''' persistent = {} persistent["title"] = row.get("title", "") persistent["office"] = row.get("office", "") persistent["publish"] = row.get("publish", "") persistent["expiry"] = row.get("expiry", "") persistent["type"] = row.get("type", "") persistent["status"] = row.get("status", "") persistent["url"] = row.get("url", "") return persistent @logger.catch def walk_dir_start(): """ 生成开始 :param file_dir: :return: """ from bson import ObjectId FileOss, MySql, previewOss = link_db() mongoClient = MongoClient("192.168.3.166:27082") m_col = mongoClient["bxh"]["gjflfgzsk"] classify_dict = load_classify(args.classify_path) base_class=[args] for row in m_col.find({"_id": {"$gt": ObjectId("604201abdca8410f1ef2cafb")}}, no_cursor_timeout=True).sort("_id", 1): print("id--->", row["_id"]) file_path = "" persistent = get_persistent(row) title = row.get("title", "") files = row.get("file", []) for file in files: if "WORD" == file.get("type", ""): file_path = file.get("path", "") if not file_path: continue filename = file_path.split("/")[-1] paths = "./files" need_field = {} need_field["docName"] = title state, target_path = other_save(paths, filename) if not state: continue doctype, suffix, fileSize = get_property(target_path) if not check_file_type(doctype): # 类型检查 continue need_field["docFileType"] = docType[doctype] # 文件类型 need_field["docFileSuffix"] = suffix # 文件后缀 need_field["docFileSize"] = fileSize # 文件大小 state, file_md5 = md5_exists(MySql, target_path) # md5检查去重 need_field["md5"] = file_md5 if state: logger.warning("%s已经存在--------》" % filename) continue field = get_field(target_path, persistent) if not field: logger.warning("储存失败--->%s" % row["_id"]) continue # continue need_field.update(field) need_field["uploadDate"] = datetime.datetime.now() need_field["isDelete"] = 0 need_field["downOrUp"] = 1 doctype = row.get("type", "").strip() if doctype in classify_dict: docClass = classify_dict[doctype] else: continue need_field["docTags"] = "法律法规/" + doctype # 获取输入标签 need_field["docClass"] = docClass # 获取输入类别 need_field["userId"] = str(row["_id"]) # 获取输入用户ID need_field["appId"] = "10000" save_field(MySql, need_field) # 保存到mysql if __name__ == '__main__': filepath = "./files" import time ret=load_classify(args.classify_path) print(ret) # start = time.time() # walk_dir_start() # end = time.time() # print(end - start)