123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- # -*- coding: utf-8 -*-
- # @Time : 2024/2/6
- # @Author : lkj
- # @description :
- from collections import Counter
- from numpy import dot
- from numpy.linalg import norm
- from utils.milvus_hlper import Milvus
- from utils.request_fun import text_to_vector
- import numpy as np
- class JyCode(object):
- def __init__(self, db_name, config):
- self.m = Milvus(table=db_name, **config)
- @staticmethod
- def cosine_similarity(vector1, vector2):
- """
- 余弦相似计算
- """
- dot_product = np.dot(vector1, vector2)
- norm1 = np.linalg.norm(vector1)
- norm2 = np.linalg.norm(vector2)
- similarity = dot_product / (norm1 * norm2)
- return round(similarity, 4)
- def check(self, text_vec, data_es: list, offset_value=0.025, min_value=0.9, max_value=0.965):
- '''
- 规则打可信度
- :param text_vec:
- :param data_es: 通过向量查询后的数据
- :param offset_value topk之间的差异值
- :param min_value 可信度最低阈值,低于该阈值不可信
- :param max_value 最大阈值,大于该值默认直接可信
- :return:
- '''
- try:
- sim_list = [i[2] for i in data_es] # 相似度列表
- sim_mean = sum(sim_list) / len(sim_list) # 平均相似度
- data = [item for item in data_es if item[2] >= sim_mean - offset_value ] # 删除es结果中差异性大且相似度低的值
- count = 3 # 统计个数阈值
- if len(data) <= 4:
- count = 2
- if data[0][2] > max_value: # 查询结果为正序如果满足极大值或者较大差异直接返回第一个数据
- return data[0][1], 'mode01'
- if data[0][2] - data[1][2] > offset_value and data[0][2] > min_value: # 如果第一个可信度大于第二个0.025并且第一个相似度大于0.9
- return data[0][1], 'mode1'
- # 新判断当出现多个大于0.9判断每个上一级分类的选取最高的
- score_list = [i[1] for i in data_es if i[2] > min_value]
- if len(score_list) >= 2:
- best_code = ''
- best_sim = 0
- for end_code in score_list:
- father_code = end_code[:-2] # 父级的id
- father_code_name = self.m.get_name(father_code) # 父级的name
- father_vec = text_to_vector(father_code_name)
- similarity = round(dot(text_vec, father_vec) / (norm(text_vec) * norm(father_vec)), 4) # 相似度计算
- if similarity > best_sim: # 循环查找父级相似度最大的
- best_code = end_code
- best_sim = similarity
- return best_code, 'mode1'
- # 第三档的可信度规则
- elif data[0][2] - data[1][2] > (offset_value-0.01) and data[0][2] > min_value:
- pcode = data[0][1][:-2]
- pname = self.m.get_name(pcode)
- pvec = text_to_vector(pname)
- similarity = round(dot(text_vec, pvec) / (norm(text_vec) * norm(pvec)), 4) # 相似度计算
- if similarity > (min_value-0.1): # 第一个值的父级相似度大于0.8直接返回该值
- return data[0][1], 'mode1'
- else:
- # 统计整个向量库中返回的数据的出现的频率做规则
- code_list = []
- for row in data:
- if len(row[1][:-2]) > 1:
- code_list.append(row[1][:-2])
- code_list.append(row[1])
- word_count = dict(Counter(code_list)) # 统计对应分类及其父类的词频,如果某个词的父类频率高则定位到该类
- max_word = [(k, v) for k, v in word_count.items() if v >= count]
- if len(max_word) == 1:
- return max_word[0][0], 'mode2'
- else: # 如果存在多个值进行对比
- code = ''
- code_sim = 0
- for word in max_word:
- code_ = word[0]
- code_name = self.m.get_name(code_)
- vec = text_to_vector(code_name)
- sim_ = round(dot(text_vec, vec) / (norm(text_vec) * norm(vec)), 4) # 相似度计算
- if sim_ > code_sim:
- code_sim = sim_
- code = code_
- return code, 'mode3'
- return '', 'mode0'
- except Exception as e:
- print('check_error', e)
- return '', 'error'
- def run_mode1(self,text, baseclass=None):
- """
- 标的物数字化主函数
- :param text:
- :param baseclass
- :return: result_name:结果名称, similarity:结果与输入文本相似度, mode:流程模式, code:结果编码, credibility:结果可信度
- """
- vec = text_to_vector(text) # 转成向量
- search_result = self.m.search_china(vec, baseclass) # 查询结果
- if not search_result:
- return '', 0, 'mode0', '', 0
- similarity = 0 # 文本与结果相似度
- result_name = ''
- mode = 'mode0'
- credibility = 0
- code = ''
- if search_result:
- check_result = self.check(vec, search_result) # 结果筛选
- if check_result[0]:
- code = check_result[0]
- result_name = self.m.get_name(check_result[0]) # 名称映射
- mode = check_result[1]
- if result_name:
- res_vec = text_to_vector(result_name)
- similarity = round(dot(vec, res_vec) / (norm(vec) * norm(res_vec)), 4) # 相似度计算
- if mode == 'mode01':
- credibility = 0.99
- if mode == 'mode1':
- credibility = 0.95
- if mode == 'mode3' and similarity > 0.85:
- credibility = 0.90
- if mode == 'mode2' and similarity > 0.8:
- credibility = 0.85
- if mode in ['mode2', 'mode3']:
- pcode = code[:-2]
- if not pcode:
- pcode = code
- p_name = self.m.get_name(pcode) # 父类名称
- p_name_vec = text_to_vector(p_name) # 父类向量
- p_similarity = round(dot(p_name_vec, vec) / (norm(p_name_vec) * norm(vec)), 4) # 文本与父类计算
- if p_similarity > 0.85 and similarity > 0.9 or similarity == 0.99:
- mode = 'mode4'
- credibility = 0.99
- if credibility == 0:
- result_name = search_result[0][0]
- similarity = search_result[0][2]
- mode = ''
- code = search_result[0][1]
- return result_name, similarity, mode, code, credibility
- def run_mode1_main(self,text, baseclass=None):
- """
- """
- try:
- result = list(self.run_mode1(text, baseclass))
- if not result[0]:
- return ['', '', '', 0, '']
- result.pop(2)
- code = result[2]
- route = self.m.get_root_zc(code)
- result.append(route)
- if result[1] > 0.9 and result[-2] == 0:
- result[-2] = 0.85
- if result[1] == 1.0:
- result[-2] = 0.99
- return result
- except Exception as e:
- print('政采分类错误--->', e)
- return ['', '', '', '', '']
|