# -*- coding: utf-8 -*- # @Time : 2024/2/6 # @Author : lkj # @description : from collections import Counter from numpy import dot from numpy.linalg import norm from utils.milvus_hlper import Milvus from utils.request_fun import text_to_vector import numpy as np class JyCode(object): def __init__(self, db_name, config): self.m = Milvus(table=db_name, **config) @staticmethod def cosine_similarity(vector1, vector2): """ 余弦相似计算 """ dot_product = np.dot(vector1, vector2) norm1 = np.linalg.norm(vector1) norm2 = np.linalg.norm(vector2) similarity = dot_product / (norm1 * norm2) return round(similarity, 4) def check(self, text_vec, data_es: list, offset_value=0.025, min_value=0.9, max_value=0.965): ''' 规则打可信度 :param text_vec: :param data_es: 通过向量查询后的数据 :param offset_value topk之间的差异值 :param min_value 可信度最低阈值,低于该阈值不可信 :param max_value 最大阈值,大于该值默认直接可信 :return: ''' try: sim_list = [i[2] for i in data_es] # 相似度列表 sim_mean = sum(sim_list) / len(sim_list) # 平均相似度 data = [item for item in data_es if item[2] >= sim_mean - offset_value ] # 删除es结果中差异性大且相似度低的值 count = 3 # 统计个数阈值 if len(data) <= 4: count = 2 if data[0][2] > max_value: # 查询结果为正序如果满足极大值或者较大差异直接返回第一个数据 return data[0][1], 'mode01' if data[0][2] - data[1][2] > offset_value and data[0][2] > min_value: # 如果第一个可信度大于第二个0.025并且第一个相似度大于0.9 return data[0][1], 'mode1' # 新判断当出现多个大于0.9判断每个上一级分类的选取最高的 score_list = [i[1] for i in data_es if i[2] > min_value] if len(score_list) >= 2: best_code = '' best_sim = 0 for end_code in score_list: father_code = end_code[:-2] # 父级的id father_code_name = self.m.get_name(father_code) # 父级的name father_vec = text_to_vector(father_code_name) similarity = round(dot(text_vec, father_vec) / (norm(text_vec) * norm(father_vec)), 4) # 相似度计算 if similarity > best_sim: # 循环查找父级相似度最大的 best_code = end_code best_sim = similarity return best_code, 'mode1' # 第三档的可信度规则 elif data[0][2] - data[1][2] > (offset_value-0.01) and data[0][2] > min_value: pcode = data[0][1][:-2] pname = self.m.get_name(pcode) pvec = text_to_vector(pname) similarity = round(dot(text_vec, pvec) / (norm(text_vec) * norm(pvec)), 4) # 相似度计算 if similarity > (min_value-0.1): # 第一个值的父级相似度大于0.8直接返回该值 return data[0][1], 'mode1' else: # 统计整个向量库中返回的数据的出现的频率做规则 code_list = [] for row in data: if len(row[1][:-2]) > 1: code_list.append(row[1][:-2]) code_list.append(row[1]) word_count = dict(Counter(code_list)) # 统计对应分类及其父类的词频,如果某个词的父类频率高则定位到该类 max_word = [(k, v) for k, v in word_count.items() if v >= count] if len(max_word) == 1: return max_word[0][0], 'mode2' else: # 如果存在多个值进行对比 code = '' code_sim = 0 for word in max_word: code_ = word[0] code_name = self.m.get_name(code_) vec = text_to_vector(code_name) sim_ = round(dot(text_vec, vec) / (norm(text_vec) * norm(vec)), 4) # 相似度计算 if sim_ > code_sim: code_sim = sim_ code = code_ return code, 'mode3' return '', 'mode0' except Exception as e: print('check_error', e) return '', 'error' def run_mode1(self,text, baseclass=None): """ 标的物数字化主函数 :param text: :param baseclass :return: result_name:结果名称, similarity:结果与输入文本相似度, mode:流程模式, code:结果编码, credibility:结果可信度 """ vec = text_to_vector(text) # 转成向量 search_result = self.m.search_china(vec, baseclass) # 查询结果 if not search_result: return '', 0, 'mode0', '', 0 similarity = 0 # 文本与结果相似度 result_name = '' mode = 'mode0' credibility = 0 code = '' if search_result: check_result = self.check(vec, search_result) # 结果筛选 if check_result[0]: code = check_result[0] result_name = self.m.get_name(check_result[0]) # 名称映射 mode = check_result[1] if result_name: res_vec = text_to_vector(result_name) similarity = round(dot(vec, res_vec) / (norm(vec) * norm(res_vec)), 4) # 相似度计算 if mode == 'mode01': credibility = 0.99 if mode == 'mode1': credibility = 0.95 if mode == 'mode3' and similarity > 0.85: credibility = 0.90 if mode == 'mode2' and similarity > 0.8: credibility = 0.85 if mode in ['mode2', 'mode3']: pcode = code[:-2] if not pcode: pcode = code p_name = self.m.get_name(pcode) # 父类名称 p_name_vec = text_to_vector(p_name) # 父类向量 p_similarity = round(dot(p_name_vec, vec) / (norm(p_name_vec) * norm(vec)), 4) # 文本与父类计算 if p_similarity > 0.85 and similarity > 0.9 or similarity == 0.99: mode = 'mode4' credibility = 0.99 if credibility == 0: result_name = search_result[0][0] similarity = search_result[0][2] mode = '' code = search_result[0][1] return result_name, similarity, mode, code, credibility def run_mode1_main(self,text, baseclass=None): """ """ try: result = list(self.run_mode1(text, baseclass)) if not result[0]: return ['', '', '', 0, ''] result.pop(2) code = result[2] route = self.m.get_root_zc(code) result.append(route) if result[1] > 0.9 and result[-2] == 0: result[-2] = 0.85 if result[1] == 1.0: result[-2] = 0.99 return result except Exception as e: print('政采分类错误--->', e) return ['', '', '', '', '']