|
- # -*- coding: utf-8 -*-
- # @Time : 2023/7/10
- # @Author : lkj
- # @description : 流程2 查询标准向量库语义相似
- import time
- from collections import Counter
- from numpy import dot
- from numpy.linalg import norm
- from utils.milvus_hlper import Milvus
- from utils.request_fun import text_to_vector
- from config import milvus_config
- m = Milvus(table='jianyu_code_2', **milvus_config)
- def count_fun(data:list):
- """
- 暴力搜索
- """
- try:
- count_dict = {}
- for item in data:
- split = 0
- if len(item) == 1:
- count_dict[item] = count_dict.get(item, 0) + 1
- else:
- for i in range(int((len(item) - 1) / 2)):
- if split == 0:
- c = item
- else:
- c = item[:-split]
- count_dict[c] = count_dict.get(c, 0) + 1
- split += 2
- if not count_dict:
- return '', 0
- max_value = max(count_dict.values()) # 找到字典中最大的 value 值
- if max_value == 1:
- max_length_key = min(
- [key for key, value in count_dict.items() if value == max_value],
- key=len
- )
- else:
- max_length_key = max(
- [key for key, value in count_dict.items() if value == max_value],
- key=len
- )
- return max_length_key,max_value
- except Exception as e:
- print('count_fun_errorxxx',e)
- return '',0
- def check(text_vec,data_: list):
- '''
- 第二档可信度判断规则
- :param text_vec:
- :param data_:
- :return:
- '''
- try:
- sim_list = [i[2] for i in data_]
- sim_mean = sum(sim_list)/len(sim_list)
- data = [item for item in data_ if item[2] >= sim_mean-0.025] # 删除es结果中差异性大且相似度低的值
- model01_res, model01_ = check_model01(text_vec, data)
- if model01_res:
- return model01_res, model01_
- # 新判断当出现多个大于0.925判断每个上一级分类的选取最高的
- score_list = [i[1] for i in data_ if i[2] > 0.92]
- count_flag = 3 # 出现词频阈值,根据score_list元素个数计算
- if len(score_list) <= 4:
- count_flag = 2
- word_count = dict(Counter(score_list)) # 统计出现频率高且相似度高的分类
- max_word = [(k, v) for k, v in word_count.items() if v >= count_flag]
- if len(max_word) == 1:
- return max_word[0][0], 'mode01'
- elif len(score_list) >= 2:
- best_code = ''
- best_sim = 0
- for code_ in score_list:
- if len(code_) ==1:
- continue
- father_code = code_[:-2]
- father_code_name = m.get_name(father_code)
- father_vec = text_to_vector(father_code_name)
- similarity = round(dot(text_vec, father_vec) / (norm(text_vec) * norm(father_vec)), 4) # 相似度计算
- if similarity > best_sim:
- best_code = code_
- best_sim = similarity
- return best_code, 'mode1'
- else:
- count_code, max_value = count_fun([i[1] for i in data_ if i[2] >= 0.85]) # 如果暴力查询的结果大于6/7 并且分类层级要大于2层
- if (max_value/len(data_)) >= (len(data_)-2)/len(data_) and len(count_code)>3:
- return count_code,'mode1'
- return '', ''
- except Exception as e:
- print('check_errorfff', e)
- return '', 'error'
- def check_model01(text_vec,data_es):
- """
- 第一档可信度判断规则
- :param text_vec:
- :param data_es:
- :return:
- """
- if data_es[0][2] > 0.945: # es查询结果为正序如果满足极大值或者较大差异直接返回第一个数据
- return data_es[0][1], 'mode01'
- output_lst = []
- word_count = Counter([row[1] for row in data_es if row[2] > 0.85])
- if not word_count:
- return '', ''
- max_pair = max(word_count.items(), key=lambda x: x[1]) # 统计词频如果词频最大值大于等于5/7则输出该值
- if max_pair[1]/len(data_es) >= (int(len(data_es)/2)+1)/len(data_es):
- return max_pair[0],'mode01'
- for i in range(min(3, len(data_es))): # 只判断前三个元素
- if data_es[i][0] == data_es[0][0] and data_es[i][2] > 0.85:
- output_lst.append(data_es[i][1])
- if len(output_lst) == 3:
- return data_es[0][1], 'mode01'
- elif data_es[0][2] - data_es[1][2] > 0.02 and data_es[0][2] > 0.91: # 第一个结果极大于后面
- p_code = data_es[0][1][:-2]
- if not p_code or (data_es[0][2] > 0.91): # 如果es得分第一的结果只有一层且相似度大于0.9就默认是正确
- return data_es[0][1], 'mode01'
- else:
- p_name = m.get_name(p_code)
- pvec = text_to_vector(p_name)
- similarity = round(dot(text_vec, pvec) / (norm(text_vec) * norm(pvec)), 4) # 相似度计算
- if similarity > 0.8: # 标的物与该分类的父级的相似度
- return data_es[0][1], 'mode01'
- return '', ''
- def run_mode1(text,baseclass=None):
- """
- 标的物数字化主函数
- :param text:
- :param classify_name
- :return: result_name:结果名称, similarity:结果与输入文本相似度, mode:流程模式, code:结果编码, credibility:结果可信度
- """
- try:
- vec = text_to_vector(text) # 转成向量
- # search_result = m.search_good(vec,7,baseclass) # 查询结果
- search_result = m.search_industry(vec,['code', 'class_name', 'embeddings', 'explain', 'root', 'private_code'],
- industry_list=['物业'])
- print(search_result)
- similarity = 0 # 文本与结果相似度
- result_name = '' # 分类名称
- mode = '' # 分类判断模式
- credibility = 0 # 可信度
- code = '' # 代码
- if search_result and len(search_result) > 2:
- check_result = check(vec, search_result) # 结果筛选
- if check_result[0]:
- code = check_result[0]
- result_name = m.get_name(code) # 名称映射
- mode = check_result[1]
- if result_name:
- res_vec = text_to_vector(result_name)
- similarity = round(dot(vec, res_vec) / (norm(vec) * norm(res_vec)), 4) # 相似度计算
- if mode == 'mode1':
- credibility = 0.90
- if mode == 'mode01':
- credibility = 0.95
- if credibility == 0: # 可信度为0 则用第一个结果作为输出
- result_name = search_result[0][0]
- similarity = search_result[0][2]
- mode = ''
- code = search_result[0][1]
- return [result_name, similarity, mode, code, credibility]
- except Exception as e:
- print('errrrrssss',e)
- print(text,baseclass)
- return []
- def run_mode1_main(text,baseclass):
- try:
- result = run_mode1(text, baseclass)
- if not result[0]:
- return ['', '', '', 0, '']
- result.pop(2)
- code = result[2]
- route = m.get_root_zc(code)
- result.append(route)
- return result
- except Exception as e:
- print('errrrr',e)
- return ['', '', '', '', '']
- if __name__ == '__main__':
- a = [('服务', 'C', 0.9276, '服务/', '服务'), ('审计服务', 'C2303', 0.9264, '商务服务/审计服务/', '年审计服务'), ('运行维护服务', 'C1607', 0.9234, '信息技术服务/运行维护服务/', '年信息安全服务'), ('物业管理服务', 'C2104', 0.9202, '房地产服务/物业管理服务/', '年物业服务'), ('服务', 'C', 0.9145, '服务/', '综合服务'), ('会议服务', 'C2201', 0.9111, '会议、展览、住宿和餐饮服务/会议服务/', '会务服务'), ('软件运维服务', 'C160703', 0.9108, '信息技术服务/运行维护服务/软件运维服务/', '业务系统服务')]
- v = text_to_vector('xxxx')
- print(check(v, a))
|