#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # @Author : lijunliang # @Email : lijunliang@topnet.net.cn # @File : law.py # @Software: PyCharm import os import argparse import datetime from module.parse_file import parse_file_start # from module.abstract import make_summary from module.price import get_pricing from loguru import logger from util.oss_file import OssServeClient from module.read_config import read_ini from util.db_helper import DBHelper from module.sql_operate import md5_exists from module.sql_operate import save_field from module.parse_file import get_property from util.file_operations import generate_directory, del_directory from module.ac_sensitive import ACAutomation # from pymongo import MongoClient from util.convert2img import convert_img from module.load_classify import load_classify import pandas as pd import uuid logger.add("./logging/run.log", rotation="12:00") # 日志文件 parser = argparse.ArgumentParser("指定监听端口") parser.add_argument('-dir', '--file_dir', default="./data/file/", type=str, help="目录") parser.add_argument('-config', '--config_path', default="./data/config.ini", type=str, help="配置文件config.ini") parser.add_argument('-class', '--docClass', default="招标文件", type=str, help="类别") parser.add_argument('-tags', '--docTags', default="", type=str, help="标签") parser.add_argument('-pricing_model', '--pricing_type', default="页数", type=str, help="页数or字数") parser.add_argument('-pricing', '--base_price', default=500, type=float, help="初始价钱") # parser.add_argument('-addWater', '--Water', default="0" * 12, type=str, help="用户id") parser.add_argument('-sensitive_file', '--sensitive_path', default="./data/sensitive_words.txt", type=str, help="敏感词文件路径") parser.add_argument('-classify_file', '--classify_path', default="./data/classify.csv", type=str, help="分类文件") args = parser.parse_args() docType = {'doc': 1, 'docx': 1, 'ppt': 4, 'pptx': 4, 'xls': 3, 'xlsx': 3, 'txt': 5, 'pdf': 2, 'html': 2, 'htm': 2} # 文件类型字典 ACA = ACAutomation() ACA.parse(args.sensitive_path) def create_oss_object(oss_config: dict): """ oss服务初始化函数 :param oss_config: :return: """ return OssServeClient(access_key_id=oss_config["access_key_id"], access_key_secret=oss_config["access_key_secret"], endpoint=oss_config["endpoint"], bucket_name=oss_config["bucket_name"]) def link_db(): ''' 连接数据库 :return: ''' Config = read_ini(args.config_path) FileConfig = Config["oss_file_config"] MySqlConfig = Config["mysql_config"] previewConfig = Config["previewConfig"] attachConfig = Config["attachments_oss_Config"] FileOss = create_oss_object(FileConfig) # file文件上传oss previewOss = create_oss_object(previewConfig) # file文件上传oss attachOss = create_oss_object(attachConfig) # file文件上传oss MySql = DBHelper(MySqlConfig) return FileOss, MySql, previewOss, attachOss FileOss, MySql, previewOss, attachOss = link_db() def check_file_type(doctype: str) -> bool: """ 文件类型检测 :param doctype: :return: """ if doctype not in docType: logger.warning("%s文件类型不匹配---->" % doctype) return False return True @logger.catch def upload_oss(file_path: str, file_content: str, pdf_path: str, cover_path: str, persistent: dict) -> dict: """ 文件上传oss :param file_path: 文件路径 :param file_content: 解析文本 :param persistent: 自定义请求头 :return: """ global FileOss, previewOss succeed = {} source_oss_name = str(uuid.uuid1(int(time.time()))) + "." + file_path.split(".")[-1] pdf_oss_name = str(uuid.uuid1(int(time.time()))) + "." + "pdf" cover_oss_name = str(uuid.uuid1(int(time.time()))) + "." + "png" text_oss_name = str(uuid.uuid1(int(time.time()))) per_header = FileOss.create_oss_meta(persistent) if not per_header: per_header = None # 源文件上传 with open(file_path, "rb") as file: state, request_id = FileOss.upload_bytes_file(source_oss_name, file, headers=per_header) if state: succeed["ossDocId"] = source_oss_name # pdf文件上传 with open(pdf_path, "rb") as pdf: state, request_id = FileOss.upload_bytes_file(pdf_oss_name, pdf, headers=per_header) if state: succeed["ossPdfId"] = pdf_oss_name # 文本文件上传 state, request_id = FileOss.upload_text_file(text_oss_name, file_content, headers=per_header) if state: succeed["ossTxtId"] = text_oss_name # 封面图片上传 with open(cover_path, "rb") as cover: state, request_id = previewOss.upload_bytes_file(cover_oss_name, cover, headers=per_header) if state: succeed["previewImgId"] = cover_oss_name return succeed def get_field(file_path: str, persistent: dict): ''' 文件获取重要字段 :param file_path: :param file_md5: :param persistent:自定义请求头字段 :return: ''' field = {} # 解析文件,获得文本文档 file_content, pages, pdf_path = parse_file_start(file_path) text_size = len(file_content) if text_size < 400: # 检测文本长度检测决定是否上传 return {} if pages < 1: return {} # search = ACA.search(file_content) #敏感词检查 # if search: # return field cover_path = convert_img(pdf_path) if not cover_path: return {} field["docPageSize"] = pages # 上传成功的文件,字段在函数内定义 upload_ret = upload_oss(file_path, file_content, pdf_path, cover_path, persistent) if not upload_ret: return {} field.update(upload_ret) # 获得摘要 # try: # summary = make_summary(file_content) # 获得摘要 # except Exception as e: # logger.warning("摘要提取失败-->%s" % file_content) # summary = "" field["docSummary"] = file_content[:500] # 获得价钱 return field def other_save(ossid: str): ''' :param paths: :param filename: :return: ''' try: global attachOss abs_dir = os.path.abspath(".") target_dir = os.path.join(abs_dir, "data/folder/") if os.path.exists(target_dir): del_directory(target_dir) generate_directory(target_dir) target_path = os.path.join(target_dir, ossid) state = attachOss.download_file(ossid, target_path) if not state: return False, target_path except Exception as e: print(e) return False, "" return True, target_path def get_classify(sub_class: str): ''' 获取全部的分类 :param sub_class: :return: ''' total_classify = [] classifies = sub_class.split(",") for classify in classifies: base_classify = [args.docClass] classify = classify.strip() if classify: for val in classify.split("_"): base_classify.append(val) if len(base_classify) > 1: total_classify.append("/".join(base_classify)) return total_classify @logger.catch def walk_dir_start(): """ 生成开始 :param file_dir: :return: """ FileOss, MySql, previewOss, attachOss = link_db() classify_dict = load_classify(args.classify_path) mongo_info = pd.read_csv("./data/data0_doc.csv") for row in mongo_info.values.tolist(): print("id--->", row[0]) title = row[4] title = "".join(title.split(".")[:-1]) if len(title) < 13: title = row[7] title = title.strip() if not title: title = row[1] title = title.strip() if not title: continue need_field = {} need_field["docName"] = title sub_class = row[3] ossid = row[5] if not sub_class: continue state, target_path = other_save(ossid) if not state: continue doctype, suffix, fileSize = get_property(target_path) if not check_file_type(doctype): # 类型检查 continue need_field["docFileType"] = docType[doctype] # 文件类型 need_field["docFileSuffix"] = suffix # 文件后缀 need_field["docFileSize"] = fileSize # 文件大小 state, file_md5 = md5_exists(MySql, target_path) # md5检查去重 need_field["md5"] = file_md5 if state: logger.warning("%s已经存在--------》" % title) continue field = get_field(target_path, {}) if not field: logger.warning("储存失败--->%s" % row[0]) continue need_field.update(field) if row[6]: need_field["uploadDate"] = datetime.datetime.fromtimestamp(row[6]) else: need_field["uploadDate"] = datetime.datetime.now() need_field["isDelete"] = 0 need_field["downOrUp"] = 1 doctypes = get_classify(sub_class) docClass = ",".join([classify_dict[doctype] for doctype in doctypes if doctype in classify_dict]) if not docClass: continue need_field["docTags"] = ",".join(set([s for v in doctypes for s in v.split("/")])) # 获取输入标签 need_field["docClass"] = docClass # 获取输入类别 need_field["userId"] = row[0] need_field["appId"] = "10000" pages = need_field["docPageSize"] price = get_pricing(docClass, pages) need_field["price"] = price save_field(MySql, need_field) # 保存到mysql if __name__ == '__main__': filepath = "./files" import time # ret = load_classify(args.classify_path) # print(ret) start = time.time() walk_dir_start() end = time.time() # print(end - start)