# -*- coding: utf-8 -*- # @Time : 2023/7/10 # @Author : lkj # @description : 流程2 查询标准向量库语义相似 import time from collections import Counter from numpy import dot from numpy.linalg import norm from utils.milvus_hlper import Milvus from utils.request_fun import text_to_vector from config import milvus_config m = Milvus(table='jianyu_code_2', **milvus_config) def count_fun(data:list): """ 暴力搜索 """ try: count_dict = {} for item in data: split = 0 if len(item) == 1: count_dict[item] = count_dict.get(item, 0) + 1 else: for i in range(int((len(item) - 1) / 2)): if split == 0: c = item else: c = item[:-split] count_dict[c] = count_dict.get(c, 0) + 1 split += 2 if not count_dict: return '', 0 max_value = max(count_dict.values()) # 找到字典中最大的 value 值 if max_value == 1: max_length_key = min( [key for key, value in count_dict.items() if value == max_value], key=len ) else: max_length_key = max( [key for key, value in count_dict.items() if value == max_value], key=len ) return max_length_key,max_value except Exception as e: print('count_fun_errorxxx',e) return '',0 def check(text_vec,data_: list): ''' 第二档可信度判断规则 :param text_vec: :param data_: :return: ''' try: sim_list = [i[2] for i in data_] sim_mean = sum(sim_list)/len(sim_list) data = [item for item in data_ if item[2] >= sim_mean-0.025] # 删除es结果中差异性大且相似度低的值 model01_res, model01_ = check_model01(text_vec, data) if model01_res: return model01_res, model01_ # 新判断当出现多个大于0.925判断每个上一级分类的选取最高的 score_list = [i[1] for i in data_ if i[2] > 0.92] count_flag = 3 # 出现词频阈值,根据score_list元素个数计算 if len(score_list) <= 4: count_flag = 2 word_count = dict(Counter(score_list)) # 统计出现频率高且相似度高的分类 max_word = [(k, v) for k, v in word_count.items() if v >= count_flag] if len(max_word) == 1: return max_word[0][0], 'mode01' elif len(score_list) >= 2: best_code = '' best_sim = 0 for code_ in score_list: if len(code_) ==1: continue father_code = code_[:-2] father_code_name = m.get_name(father_code) father_vec = text_to_vector(father_code_name) similarity = round(dot(text_vec, father_vec) / (norm(text_vec) * norm(father_vec)), 4) # 相似度计算 if similarity > best_sim: best_code = code_ best_sim = similarity return best_code, 'mode1' else: count_code, max_value = count_fun([i[1] for i in data_ if i[2] >= 0.85]) # 如果暴力查询的结果大于6/7 并且分类层级要大于2层 if (max_value/len(data_)) >= (len(data_)-2)/len(data_) and len(count_code)>3: return count_code,'mode1' return '', '' except Exception as e: print('check_errorfff', e) return '', 'error' def check_model01(text_vec,data_es): """ 第一档可信度判断规则 :param text_vec: :param data_es: :return: """ if data_es[0][2] > 0.945: # es查询结果为正序如果满足极大值或者较大差异直接返回第一个数据 return data_es[0][1], 'mode01' output_lst = [] word_count = Counter([row[1] for row in data_es if row[2] > 0.85]) if not word_count: return '', '' max_pair = max(word_count.items(), key=lambda x: x[1]) # 统计词频如果词频最大值大于等于5/7则输出该值 if max_pair[1]/len(data_es) >= (int(len(data_es)/2)+1)/len(data_es): return max_pair[0],'mode01' for i in range(min(3, len(data_es))): # 只判断前三个元素 if data_es[i][0] == data_es[0][0] and data_es[i][2] > 0.85: output_lst.append(data_es[i][1]) if len(output_lst) == 3: return data_es[0][1], 'mode01' elif data_es[0][2] - data_es[1][2] > 0.02 and data_es[0][2] > 0.91: # 第一个结果极大于后面 p_code = data_es[0][1][:-2] if not p_code or (data_es[0][2] > 0.91): # 如果es得分第一的结果只有一层且相似度大于0.9就默认是正确 return data_es[0][1], 'mode01' else: p_name = m.get_name(p_code) pvec = text_to_vector(p_name) similarity = round(dot(text_vec, pvec) / (norm(text_vec) * norm(pvec)), 4) # 相似度计算 if similarity > 0.8: # 标的物与该分类的父级的相似度 return data_es[0][1], 'mode01' return '', '' def run_mode1(text,baseclass=None): """ 标的物数字化主函数 :param text: :param classify_name :return: result_name:结果名称, similarity:结果与输入文本相似度, mode:流程模式, code:结果编码, credibility:结果可信度 """ try: vec = text_to_vector(text) # 转成向量 # search_result = m.search_good(vec,7,baseclass) # 查询结果 search_result = m.search_industry(vec,['code', 'class_name', 'embeddings', 'explain', 'root', 'private_code'], industry_list=['物业']) print(search_result) similarity = 0 # 文本与结果相似度 result_name = '' # 分类名称 mode = '' # 分类判断模式 credibility = 0 # 可信度 code = '' # 代码 if search_result and len(search_result) > 2: check_result = check(vec, search_result) # 结果筛选 if check_result[0]: code = check_result[0] result_name = m.get_name(code) # 名称映射 mode = check_result[1] if result_name: res_vec = text_to_vector(result_name) similarity = round(dot(vec, res_vec) / (norm(vec) * norm(res_vec)), 4) # 相似度计算 if mode == 'mode1': credibility = 0.90 if mode == 'mode01': credibility = 0.95 if credibility == 0: # 可信度为0 则用第一个结果作为输出 result_name = search_result[0][0] similarity = search_result[0][2] mode = '' code = search_result[0][1] return [result_name, similarity, mode, code, credibility] except Exception as e: print('errrrrssss',e) print(text,baseclass) return [] def run_mode1_main(text,baseclass): try: result = run_mode1(text, baseclass) if not result[0]: return ['', '', '', 0, ''] result.pop(2) code = result[2] route = m.get_root_zc(code) result.append(route) return result except Exception as e: print('errrrr',e) return ['', '', '', '', ''] if __name__ == '__main__': a = [('服务', 'C', 0.9276, '服务/', '服务'), ('审计服务', 'C2303', 0.9264, '商务服务/审计服务/', '年审计服务'), ('运行维护服务', 'C1607', 0.9234, '信息技术服务/运行维护服务/', '年信息安全服务'), ('物业管理服务', 'C2104', 0.9202, '房地产服务/物业管理服务/', '年物业服务'), ('服务', 'C', 0.9145, '服务/', '综合服务'), ('会议服务', 'C2201', 0.9111, '会议、展览、住宿和餐饮服务/会议服务/', '会务服务'), ('软件运维服务', 'C160703', 0.9108, '信息技术服务/运行维护服务/软件运维服务/', '业务系统服务')] v = text_to_vector('xxxx') print(check(v, a))