123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- #!/usr/bin/python3.6
- # -*- coding: utf-8 -*-
- # @Author : lijunliang
- # @Email : lijunliang@topnet.net.cn
- # @File : law.py
- # @Software: PyCharm
- import os
- import argparse
- import datetime
- from module.parse_file import parse_file_start
- from module.price import word_size_pricing
- from module.abstract import make_summary
- from loguru import logger
- from util.oss_file import OssServeClient
- from module.read_config import read_ini
- from util.db_helper import DBHelper
- from module.sql_operate import md5_exists
- from module.sql_operate import save_field
- from module.parse_file import get_property
- from util.file_operations import file_copy
- from util.file_operations import generate_directory, del_directory
- from module.ac_sensitive import ACAutomation
- from pymongo import MongoClient
- from util.convert2img import convert_img
- import pandas as pd
- import uuid
- logger.add("./logging/run.log", rotation="12:00") # 日志文件
- parser = argparse.ArgumentParser("指定监听端口")
- parser.add_argument('-dir', '--file_dir', default="./data/file/", type=str, help="目录")
- parser.add_argument('-config', '--config_path', default="./data/config.ini", type=str, help="配置文件config.ini")
- parser.add_argument('-class', '--docClass', default=1, type=int, help="类别")
- parser.add_argument('-tags', '--docTags', default="", type=str, help="标签")
- parser.add_argument('-pricing_model', '--pricing_type', default="页数", type=str, help="页数or字数")
- parser.add_argument('-pricing', '--base_price', default=500, type=float, help="初始价钱")
- # parser.add_argument('-addWater', '--Water', default="0" * 12, type=str, help="用户id")
- parser.add_argument('-sensitive_file', '--sensitive_path', default="./data/sensitive_words.txt", type=str,
- help="敏感词文件路径")
- parser.add_argument('-classify_file', '--classify_path', default="./data/classify.csv", type=str,
- help="分类文件")
- args = parser.parse_args()
- docType = {'doc': 1, 'docx': 1, 'ppt': 4, 'pptx': 4, 'xls': 3, 'xlsx': 3, 'txt': 5, 'pdf': 2, 'html': 2,
- 'htm': 2} # 文件类型字典
- ACA = ACAutomation()
- ACA.parse(args.sensitive_path)
- def create_oss_object(oss_config: dict):
- """
- oss服务初始化函数
- :param oss_config:
- :return:
- """
- return OssServeClient(access_key_id=oss_config["access_key_id"],
- access_key_secret=oss_config["access_key_secret"],
- endpoint=oss_config["endpoint"],
- bucket_name=oss_config["bucket_name"])
- def load_classify():
- classify = {}
- classify_file = pd.read_csv(args.classify_path)
- classify_label = classify_file.values
- for ind, row in enumerate(classify_label):
- if len(row) < 2:
- logger.error("分类文件加载异常--->%d行" % (ind + 1))
- continue
- classify[row[0].strip()] = row[1].strip()
- return classify
- def link_db():
- '''
- 连接数据库
- :return:
- '''
- Config = read_ini(args.config_path)
- FileConfig = Config["oss_file_config"]
- MySqlConfig = Config["mysql_config"]
- previewConfig = Config["previewConfig"]
- FileOss = create_oss_object(FileConfig) # file文件上传oss
- previewOss = create_oss_object(previewConfig) # file文件上传oss
- MySql = DBHelper(MySqlConfig)
- return FileOss, MySql, previewOss
- FileOss, MySql, previewOss = link_db()
- def check_file_type(doctype: str) -> bool:
- """
- 文件类型检测
- :param doctype:
- :return:
- """
- if doctype not in docType:
- logger.warning("%s文件类型不匹配---->" % doctype)
- return False
- return True
- @logger.catch
- def upload_oss(file_path: str, file_content: str, pdf_path: str, cover_path: str, persistent: dict) -> dict:
- """
- 文件上传oss
- :param file_path: 文件路径
- :param file_content: 解析文本
- :param persistent: 自定义请求头
- :return:
- """
- global FileOss, previewOss
- succeed = {}
- source_oss_name = str(uuid.uuid1(int(time.time()))) + "." + file_path.split(".")[-1]
- pdf_oss_name = str(uuid.uuid1(int(time.time()))) + "." + "pdf"
- cover_oss_name = str(uuid.uuid1(int(time.time()))) + "." + "png"
- text_oss_name = str(uuid.uuid1(int(time.time())))
- per_header = FileOss.create_oss_meta(persistent)
- if not per_header:
- per_header = None
- # 源文件上传
- with open(file_path, "rb") as file:
- state, request_id = FileOss.upload_bytes_file(source_oss_name, file, headers=per_header)
- if state:
- succeed["ossDocId"] = source_oss_name
- # pdf文件上传
- with open(pdf_path, "rb") as pdf:
- state, request_id = FileOss.upload_bytes_file(pdf_oss_name, pdf, headers=per_header)
- if state:
- succeed["ossPdfId"] = pdf_oss_name
- # 文本文件上传
- state, request_id = FileOss.upload_text_file(text_oss_name, file_content, headers=per_header)
- if state:
- succeed["ossTxtId"] = text_oss_name
- # 封面图片上传
- with open(cover_path, "rb") as cover:
- state, request_id = previewOss.upload_bytes_file(cover_oss_name, cover, headers=per_header)
- if state:
- succeed["previewImgId"] = cover_oss_name
- return succeed
- def get_field(file_path: str, persistent: dict):
- '''
- 文件获取重要字段
- :param file_path:
- :param file_md5:
- :param persistent:自定义请求头字段
- :return:
- '''
- field = {}
- # 解析文件,获得文本文档
- file_content, pages, pdf_path = parse_file_start(file_path)
- text_size = len(file_content)
- if text_size < 100: # 检测文本长度检测决定是否上传
- return {}
- # search = ACA.search(file_content) #敏感词检查
- # if search:
- # return field
- cover_path = convert_img(pdf_path)
- if not cover_path:
- return {}
- field["docPageSize"] = pages
- # 上传成功的文件,字段在函数内定义
- upload_ret = upload_oss(file_path, file_content, pdf_path, cover_path, persistent)
- if not upload_ret:
- return {}
- field.update(upload_ret)
- # 获得摘要
- # try:
- # summary = make_summary(file_content) # 获得摘要
- # except Exception as e:
- # logger.warning("摘要提取失败-->%s" % file_content)
- # summary = ""
- field["docSummary"] = file_content[:500]
- # 获得价钱
- price = word_size_pricing(args.pricing_type, args.base_price, text_size, 0.03)
- field["price"] = price
- return field
- def other_save(paths, filename):
- '''
- :param paths:
- :param filename:
- :return:
- '''
- try:
- source_path = os.path.join(paths, filename) # 文件路径
- abs_dir = os.path.abspath(".")
- target_dir = os.path.join(abs_dir, "data/folder/")
- if os.path.exists(target_dir):
- del_directory(target_dir)
- generate_directory(target_dir)
- target_path = os.path.join(target_dir, filename)
- state = file_copy(source_path, target_path)
- if not state:
- return False, target_path
- except Exception as e:
- print(e)
- return False, ""
- return True, target_path
- @logger.catch
- def walk_dir_start(file_dir: str):
- """
- 生成开始
- :param file_dir:
- :return:
- """
- FileOss, MySql, previewOss = link_db()
- classify_dict = load_classify()
- for paths, dirs, files in os.walk(file_dir):
- for filename in files:
- need_field = {}
- state, target_path = other_save(paths, filename)
- if not state:
- continue
- doctype, suffix, fileSize = get_property(target_path)
- if not check_file_type(doctype): # 类型检查
- continue
- docNames = filename.split(".")[:-1]
- docName = "".join(docNames)
- docName = docName.replace("x", "").replace("某", "")
- need_field["docName"] = docName
- need_field["docFileType"] = docType[doctype] # 文件类型
- need_field["docFileSuffix"] = suffix # 文件后缀
- need_field["docFileSize"] = fileSize # 文件大小
- state, file_md5 = md5_exists(MySql, target_path) # md5检查去重
- need_field["md5"] = file_md5
- if state:
- logger.warning("%s已经存在--------》" % filename)
- continue
- field = get_field(target_path, {})
- if not field:
- logger.warning("{}储存失败--->{}".format(paths, filename))
- continue
- need_field.update(field)
- need_field["uploadDate"] = datetime.datetime.now()
- need_field["isDelete"] = 0
- need_field["downOrUp"] = 1
- doctype = args.docTags
- if doctype in classify_dict:
- docClass = classify_dict[doctype]
- else:
- continue
- need_field["docTags"] = "法律法规," + doctype # 获取输入标签
- need_field["docClass"] = docClass # 获取输入类别
- need_field["userId"] = "0" * 24 # 获取输入用户ID
- need_field["appId"] = "auto"
- save_field(MySql, need_field) # 保存到mysql
- if __name__ == '__main__':
- filepath = "./files"
- import time
- start = time.time()
- walk_dir_start()
- end = time.time()
- print(end - start)
|