# coding:utf-8 from sklearn.preprocessing import Normalizer from tables.ai import product_detail_server import numpy as np import json Scaler = Normalizer() def softmax(x): # 对输入向量进行指数运算 exp_values = np.exp(x) # 计算指数值的和 sum_exp_values = np.sum(exp_values) # 计算每个元素的softmax概率值 softmax_values = exp_values / sum_exp_values return softmax_values def calculate_score(score_list): ''' 最终得分计算 :param score_list: :return: ''' if not score_list: return 0 global Scaler # 适应并转换数据 score_np = np.array(score_list) scaled_data = Scaler.fit_transform([score_list]) soft_ret = softmax(scaled_data[0]) return sum(score_np * soft_ret) class SourceEvaluate(object): def __init__(self): """ 初始化 """ self.step = {} self.direction_step = {"h": 1, "v": 0.75, "h/v": 0.5, "无": 0.25} self.table_type_step = {"识别": 1, "推断": 0.8, "字段": 0.6, "标题": 0.4, "物品": 0.2} self.verify_step = {"确定": 1, "不确定": 0.7} self.file_name_step = {"公告": 1, "标题": 0.4} def calculate_score(self, file_name, table): """ 计算得分 :return: """ verify = table.get("verify", "") direction = table.get("direction", "") table_type = table.get("type", "") direction_score = self.direction_step.get(direction, 0) table_type_score = self.table_type_step.get(table_type, 0) verify_step_score = self.verify_step.get(verify, 0) file_name_step_score = self.file_name_step.get(file_name, 0.8) return (direction_score + table_type_score + verify_step_score + file_name_step_score) / 4 class ItemNameClassify(object): def __init__(self): self.step = [] @staticmethod def calculate_score(name_score): """ 计算得分 :return: """ if name_score > 0.98: return 0.99 elif name_score > 0.86: return 0.94 else: return 0.68 def item_name_evaluate(item_score, header): ''' 名称 :param item_score: :param header: :return: ''' item_name_field = header.get("itemname", "") if not item_name_field: return 0.3 if [w for w in ["产品", "设备", "货物", "商品", "标的", "物资", "材料", "服务", "物料", "印刷品", "医疗设备"] if w in item_name_field] and "名称" in item_name_field: return item_score else: return item_score * 0.9 def number_price_verify(target): """ 数量价格验证 :param target: :return: """ number = target.get("number", 0) unit_price = target.get("unitprice", 0) total_price = target.get("totalprice", 0) if number and unit_price and total_price: if number * unit_price == total_price: return 1, 1, 1 else: return 0, 0, 0 return 1, 1, 1 def number_unit_evaluate(header, target): ''' 数量单位 :param header: :param target: :return: ''' number_header = header.get("number", "") number = target.get("number", "") unit_header = header.get("unitname", "") unitname = target.get("unitname", "") number_score, unit_score = 0, 0 if number: number_score = 1 if "数量" in number_header or "数量" in unit_header else 0.5 if unitname: unit_score = 1 if "单位" in number_header or "单位" in unit_header else 0.8 return number_score, unit_score def total_unit_price_evaluate(header, target): ''' 单价、总价 :param header: :param target: :return: ''' total_price_header = header.get("totalprice", "") unit_price_header = header.get("unitprice", "") unit_price = target.get("unitprice", "") total_price = target.get("totalprice", "") unit_price_score, total_price_score = 0, 0 if total_price: if not total_price_header: total_price_score = 0.5 else: total_price_score = 1 if "总价" in total_price_header else 0.8 if unit_price: if not unit_price_header: unit_price_score = 0.5 else: unit_price_score = 1 if "单价" in unit_price_header else 0.8 return unit_price_score, total_price_score def entity_model_server(text, prefix=""): """ 品牌、规格、型号的实体识别模型 :param text: :param prefix: :return: """ brands = [] # 品牌 models = [] # 型号 specs = [] # 规格 product = [] # 产品 if text: text = prefix + text model_dict = product_detail_server(text) output = model_dict.get('output', []) for row in output: _type = row.get("type", "") span = row.get("span", "") if _type == "品牌": brands.append(span) elif _type == "型号": models.append(span) elif _type == "规格": specs.append(span) elif "产品" in _type: product.append(span) return {"brands": brands, "models": models, "specs": specs, "product": product} else: return {"brands": [], "models": [], "specs": [], "product": []} def get_brand_model(target, title_result): """ 获取品牌和规格 :param target: :return: """ # 关注的字段 brand = target.get("brandname", "") model = target.get("model", "") # 实体识别 brand_result = entity_model_server(brand, "品牌:") model_result = entity_model_server(model, "型号:") # 品牌 if brand_result.get("brands", []): brands = [brand_result.get("brands", []), "brandname", 1] elif model_result.get("brands", []): brands = [model_result.get("brands", []), "model", 0.8] elif title_result.get("brands", []): brands = [title_result.get("brands", []), "itemname", 0.5] else: brands = [[], "", 0] # 型号 if model_result.get("models", []): models = [model_result.get("models", []), "model", 1] elif brand_result.get("models", []): models = [brand_result.get("models", []), "brandname", 0.8] else: if model_result.get("specs", []): models = [model_result.get("specs", []), "model", 1] elif brand_result.get("specs", []): models = [brand_result.get("specs", []), "brandname", 0.8] elif title_result.get("models", []) or title_result.get("specs", []): models = [title_result.get("models", []), "itemname", 0.5] else: models = [[], "", 0] # 规格 # if model_result.get("specs", []): # specs = [model_result.get("specs", []), "model", 1] # elif brand_result.get("brands", []): # specs = [brand_result.get("specs", []), "brand", 0.8] # else: # specs = [title_result.get("specs", []), "itemname", 0.5] return brands, models def brand_model_evaluate(header, target, title_ner): ''' 品牌规格型号 :param target: :return: ''' model_score, brand_score, model, brand = 0, 0, "", "" brands, models = get_brand_model(target, title_ner) if brands: brand_score = brands[-1] brand = ";".join(brands[0]) if models: model_score = models[-1] model = ";".join(models[0]) model_header = header.get("model", "") brand_header = header.get("brandname", "") if target.get("brandname") and "品牌" in brand_header: brand_score = brand_score if brand_score * 1.1 > 1 else brand_score * 1.1 if target.get("model") and "型号" in model_header or "规格" in model_header: model_score = model_score if model_score * 1.1 > 1 else model_score * 1.1 print(brand) return brand_score, model_score, brand, model source = SourceEvaluate() # ['名称': 'itemname','品牌': 'brandname','型号': 'model','数量': 'number','计量单位': 'unitname','单价': 'unitprice','总价': 'totalprice'] def evaluate(target, table): ''' 标的物评估 :param target: :return: ''' file_name = target.get("file_name", "") header = table.get("header", {}) item_name_score = target.get("score", 0) source_score = source.calculate_score(file_name, table) if not header: return [source_score, 0, 0, 0, 0, 0, 0, 0], {"table_score": round(source_score, 2)} # 名称 itemname = target.get("itemname", "") item_name_score = item_name_evaluate(item_name_score, header) title_ner = entity_model_server(itemname, "") # 品牌型号 brand_score, model_score, brand, model = brand_model_evaluate(header, target, title_ner) # 数量单位 number_score, unit_score = number_unit_evaluate(header, target) # 单价、总价 unit_price_score, total_price_score = total_unit_price_evaluate(header, target) # 校验 number_price_weight = number_price_verify(target) # 生成最终的成绩 if number_price_weight[0] == 0: number_score, unit_price_score, total_price_score = 0, 0, 0 score_list = [source_score, item_name_score, brand_score, model_score, number_score, unit_score, unit_price_score, total_price_score] # result_score = calculate_score(score_list) score_dict = { "table_score": round(source_score, 2), "itemname_score": round(item_name_score, 2), "brand_score": round(brand_score, 2), "model_score": round(model_score, 2), "number_score": round(number_score, 2), "unit_score": round(unit_score, 2), "unitprice_score": round(unit_price_score, 2), "totalpricescore": round(total_price_score, 2) } return score_list, score_dict def purchasing_evaluate_start(targets, tables): ''' 标的物评估函数 :param targets: :return: ''' purchasing_score_list = [] purchasing_evaluate_list = [] tables = json.loads(tables) if tables else [] for target in targets: table_ind = target.get("table", -1) table_ind = int(table_ind) table = tables[table_ind] if table_ind > -1 else {} score, score_dict = evaluate(target, table) purchasing_score_list.append(score) purchasing_evaluate_list.append(score_dict) score_list = [] for score in purchasing_score_list: result_score = calculate_score(score) score_list.append(result_score) merge_score = calculate_score(score_list) return purchasing_evaluate_list, round(merge_score, 2) class PurchasingChecker(object): """ 中标字段检查 """ def __init__(self): self.errors_tables = { "score": { "name": "标的物评估", "parent_name": "标的物", "parent_code": "01", "checkFn": self.check0101 } } def check0101(self, purchasinglist, purchasingsource): """ 意图结果检测 :param header: :return: """ purchasing_evaluate_list, score = purchasing_evaluate_start(purchasinglist, purchasingsource) print(score) #大于0.85的通过验证的 # if score < 0.85: # return True # else: # return False return score