123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- # coding:utf-8
- from sklearn.preprocessing import Normalizer
- from tables.ai import product_detail_server
- import numpy as np
- import json
- Scaler = Normalizer()
- def softmax(x):
- # 对输入向量进行指数运算
- exp_values = np.exp(x)
- # 计算指数值的和
- sum_exp_values = np.sum(exp_values)
- # 计算每个元素的softmax概率值
- softmax_values = exp_values / sum_exp_values
- return softmax_values
- def calculate_score(score_list):
- '''
- 最终得分计算
- :param score_list:
- :return:
- '''
- if not score_list:
- return 0
- global Scaler
- # 适应并转换数据
- score_np = np.array(score_list)
- scaled_data = Scaler.fit_transform([score_list])
- soft_ret = softmax(scaled_data[0])
- return sum(score_np * soft_ret)
- class SourceEvaluate(object):
- def __init__(self):
- """
- 初始化
- """
- self.step = {}
- self.direction_step = {"h": 1, "v": 0.75, "h/v": 0.5, "无": 0.25}
- self.table_type_step = {"识别": 1, "推断": 0.8, "字段": 0.6, "标题": 0.4, "物品": 0.2}
- self.verify_step = {"确定": 1, "不确定": 0.7}
- self.file_name_step = {"公告": 1, "标题": 0.4}
- def calculate_score(self, file_name, table):
- """
- 计算得分
- :return:
- """
- verify = table.get("verify", "")
- direction = table.get("direction", "")
- table_type = table.get("type", "")
- direction_score = self.direction_step.get(direction, 0)
- table_type_score = self.table_type_step.get(table_type, 0)
- verify_step_score = self.verify_step.get(verify, 0)
- file_name_step_score = self.file_name_step.get(file_name, 0.8)
- return (direction_score + table_type_score + verify_step_score + file_name_step_score) / 4
- class ItemNameClassify(object):
- def __init__(self):
- self.step = []
- @staticmethod
- def calculate_score(name_score):
- """
- 计算得分
- :return:
- """
- if name_score > 0.98:
- return 0.99
- elif name_score > 0.86:
- return 0.94
- else:
- return 0.68
- def item_name_evaluate(item_score, header):
- '''
- 名称
- :param item_score:
- :param header:
- :return:
- '''
- item_name_field = header.get("itemname", "")
- if not item_name_field:
- return 0.3
- if [w for w in ["产品", "设备", "货物", "商品", "标的", "物资", "材料", "服务", "物料", "印刷品", "医疗设备"] if
- w in item_name_field] and "名称" in item_name_field:
- return item_score
- else:
- return item_score * 0.9
- def number_price_verify(target):
- """
- 数量价格验证
- :param target:
- :return:
- """
- number = target.get("number", 0)
- unit_price = target.get("unitprice", 0)
- total_price = target.get("totalprice", 0)
- if number and unit_price and total_price:
- if number * unit_price == total_price:
- return 1, 1, 1
- else:
- return 0, 0, 0
- return 1, 1, 1
- def number_unit_evaluate(header, target):
- '''
- 数量单位
- :param header:
- :param target:
- :return:
- '''
- number_header = header.get("number", "")
- number = target.get("number", "")
- unit_header = header.get("unitname", "")
- unitname = target.get("unitname", "")
- number_score, unit_score = 0, 0
- if number:
- number_score = 1 if "数量" in number_header or "数量" in unit_header else 0.5
- if unitname:
- unit_score = 1 if "单位" in number_header or "单位" in unit_header else 0.8
- return number_score, unit_score
- def total_unit_price_evaluate(header, target):
- '''
- 单价、总价
- :param header:
- :param target:
- :return:
- '''
- total_price_header = header.get("totalprice", "")
- unit_price_header = header.get("unitprice", "")
- unit_price = target.get("unitprice", "")
- total_price = target.get("totalprice", "")
- unit_price_score, total_price_score = 0, 0
- if total_price:
- if not total_price_header:
- total_price_score = 0.5
- else:
- total_price_score = 1 if "总价" in total_price_header else 0.8
- if unit_price:
- if not unit_price_header:
- unit_price_score = 0.5
- else:
- unit_price_score = 1 if "单价" in unit_price_header else 0.8
- return unit_price_score, total_price_score
- def entity_model_server(text, prefix=""):
- """
- 品牌、规格、型号的实体识别模型
- :param text:
- :param prefix:
- :return:
- """
- brands = [] # 品牌
- models = [] # 型号
- specs = [] # 规格
- product = [] # 产品
- if text:
- text = prefix + text
- model_dict = product_detail_server(text)
- output = model_dict.get('output', [])
- for row in output:
- _type = row.get("type", "")
- span = row.get("span", "")
- if _type == "品牌":
- brands.append(span)
- elif _type == "型号":
- models.append(span)
- elif _type == "规格":
- specs.append(span)
- elif "产品" in _type:
- product.append(span)
- return {"brands": brands, "models": models, "specs": specs, "product": product}
- else:
- return {"brands": [], "models": [], "specs": [], "product": []}
- def get_brand_model(target, title_result):
- """
- 获取品牌和规格
- :param target:
- :return:
- """
- # 关注的字段
- brand = target.get("brandname", "")
- model = target.get("model", "")
- # 实体识别
- brand_result = entity_model_server(brand, "品牌:")
- model_result = entity_model_server(model, "型号:")
- # 品牌
- if brand_result.get("brands", []):
- brands = [brand_result.get("brands", []), "brandname", 1]
- elif model_result.get("brands", []):
- brands = [model_result.get("brands", []), "model", 0.8]
- elif title_result.get("brands", []):
- brands = [title_result.get("brands", []), "itemname", 0.5]
- else:
- brands = [[], "", 0]
- # 型号
- if model_result.get("models", []):
- models = [model_result.get("models", []), "model", 1]
- elif brand_result.get("models", []):
- models = [brand_result.get("models", []), "brandname", 0.8]
- else:
- if model_result.get("specs", []):
- models = [model_result.get("specs", []), "model", 1]
- elif brand_result.get("specs", []):
- models = [brand_result.get("specs", []), "brandname", 0.8]
- elif title_result.get("models", []) or title_result.get("specs", []):
- models = [title_result.get("models", []), "itemname", 0.5]
- else:
- models = [[], "", 0]
- # 规格
- # if model_result.get("specs", []):
- # specs = [model_result.get("specs", []), "model", 1]
- # elif brand_result.get("brands", []):
- # specs = [brand_result.get("specs", []), "brand", 0.8]
- # else:
- # specs = [title_result.get("specs", []), "itemname", 0.5]
- return brands, models
- def brand_model_evaluate(header, target, title_ner):
- '''
- 品牌规格型号
- :param target:
- :return:
- '''
- model_score, brand_score, model, brand = 0, 0, "", ""
- brands, models = get_brand_model(target, title_ner)
- if brands:
- brand_score = brands[-1]
- brand = ";".join(brands[0])
- if models:
- model_score = models[-1]
- model = ";".join(models[0])
- model_header = header.get("model", "")
- brand_header = header.get("brandname", "")
- if target.get("brandname") and "品牌" in brand_header:
- brand_score = brand_score if brand_score * 1.1 > 1 else brand_score * 1.1
- if target.get("model") and "型号" in model_header or "规格" in model_header:
- model_score = model_score if model_score * 1.1 > 1 else model_score * 1.1
- print(brand)
- return brand_score, model_score, brand, model
- source = SourceEvaluate()
- # ['名称': 'itemname','品牌': 'brandname','型号': 'model','数量': 'number','计量单位': 'unitname','单价': 'unitprice','总价': 'totalprice']
- def evaluate(target, table):
- '''
- 标的物评估
- :param target:
- :return:
- '''
- file_name = target.get("file_name", "")
- header = table.get("header", {})
- item_name_score = target.get("score", 0)
- source_score = source.calculate_score(file_name, table)
- if not header:
- return [source_score, 0, 0, 0, 0, 0, 0, 0], {"table_score": round(source_score, 2)}
- # 名称
- itemname = target.get("itemname", "")
- item_name_score = item_name_evaluate(item_name_score, header)
- title_ner = entity_model_server(itemname, "")
- # 品牌型号
- brand_score, model_score, brand, model = brand_model_evaluate(header, target, title_ner)
- # 数量单位
- number_score, unit_score = number_unit_evaluate(header, target)
- # 单价、总价
- unit_price_score, total_price_score = total_unit_price_evaluate(header, target)
- # 校验
- number_price_weight = number_price_verify(target)
- # 生成最终的成绩
- if number_price_weight[0] == 0:
- number_score, unit_price_score, total_price_score = 0, 0, 0
- score_list = [source_score, item_name_score, brand_score, model_score, number_score, unit_score, unit_price_score,
- total_price_score]
- # result_score = calculate_score(score_list)
- score_dict = {
- "table_score": round(source_score, 2),
- "itemname_score": round(item_name_score, 2),
- "brand_score": round(brand_score, 2),
- "model_score": round(model_score, 2),
- "number_score": round(number_score, 2),
- "unit_score": round(unit_score, 2),
- "unitprice_score": round(unit_price_score, 2),
- "totalpricescore": round(total_price_score, 2)
- }
- return score_list, score_dict
- def purchasing_evaluate_start(targets, tables):
- '''
- 标的物评估函数
- :param targets:
- :return:
- '''
- purchasing_score_list = []
- purchasing_evaluate_list = []
- tables = json.loads(tables) if tables else []
- for target in targets:
- table_ind = target.get("table", -1)
- table_ind = int(table_ind)
- table = tables[table_ind] if table_ind > -1 else {}
- score, score_dict = evaluate(target, table)
- purchasing_score_list.append(score)
- purchasing_evaluate_list.append(score_dict)
- score_list = []
- for score in purchasing_score_list:
- result_score = calculate_score(score)
- score_list.append(result_score)
- merge_score = calculate_score(score_list)
- return purchasing_evaluate_list, round(merge_score, 2)
- class PurchasingChecker(object):
- """
- 中标字段检查
- """
- def __init__(self):
- self.errors_tables = {
- "score": {
- "name": "标的物评估",
- "parent_name": "标的物",
- "parent_code": "01",
- "checkFn": self.check0101
- }
- }
- def check0101(self, purchasinglist, purchasingsource):
- """
- 意图结果检测
- :param header:
- :return:
- """
- purchasing_evaluate_list, score = purchasing_evaluate_start(purchasinglist, purchasingsource)
- print(score)
- #大于0.85的通过验证的
- # if score < 0.85:
- # return True
- # else:
- # return False
- return score
|