milvus_hlper.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/10/11
  3. # @Author : lkj
  4. # @description : milvus
  5. from typing import Optional, List
  6. import numpy
  7. from numpy import dot
  8. from numpy.linalg import norm
  9. from pymilvus import (connections, FieldSchema,Collection,CollectionSchema,DataType,utility)
  10. from config import redis_config
  11. from utils.redis_helper import RedisString
  12. # 使用默认数据库 ‘default’,也可以自己建数据库
  13. class Milvus(object):
  14. def __init__(self,table,**kwargs):
  15. connections.connect(**kwargs)
  16. self.col = Collection(table)
  17. self.search_params = {
  18. "metric_type": "L2",
  19. "ignore_growing": False,
  20. "params": {"nprobe": 100},
  21. }
  22. self.r = RedisString(redis_config)
  23. def search(self,vec,fileds=None,expr=None):
  24. res = self.col.search([vec], 'embeddings', self.search_params, 7,
  25. output_fields=fileds,
  26. expr=expr)
  27. return res
  28. def load(self):
  29. self.col.load()
  30. def release(self):
  31. self.col.release()
  32. def delete(self,expr):
  33. self.col.delete(expr=expr)
  34. def query(self,q):
  35. res = self.col.query(expr=q)
  36. return res
  37. def insert(self,data):
  38. self.col.insert(data=data)
  39. @staticmethod
  40. def sim(a: list, b: list):
  41. """
  42. 余弦计算两个向量相似度
  43. :param a:
  44. :param b:
  45. :return:
  46. """
  47. s = dot(a, b) / (norm(a) * norm(b))
  48. return round(s, 4)
  49. def get_name(self,code):
  50. """"
  51. 基于redis查询code对应name
  52. """
  53. while code[-1] == '0' and code[-2] == '0':
  54. code = code[:-1]
  55. code = code[:-1]
  56. name = self.r.string_get('jycode_' + code)
  57. return name
  58. def get_root_zc(self, re_code):
  59. """
  60. 根据code查询对应root
  61. """
  62. split = 0
  63. root = ''
  64. level = (len(re_code) - 1) / 2
  65. for i in range(int(level)):
  66. if split == 0:
  67. c = re_code
  68. else:
  69. c = re_code[:-split]
  70. split += 2
  71. name_code = self.get_name(c)
  72. root = name_code + '/' + root
  73. return root
  74. def search_good(self,vec,num=7,base=None):
  75. try:
  76. res = self.col.search([vec],
  77. 'embeddings', self.search_params, num,
  78. output_fields=['code','class_name','embeddings','explain','root'],) #expr=f'baseclass=="{base}"'
  79. result_list = []
  80. for hit in res:
  81. # print(hit)
  82. for row in hit:
  83. row = row.to_dict()
  84. code = row.get('entity',{}).get('code','')
  85. while code[-1] == '0' and code[-2] == '0':
  86. code = code[:-1]
  87. code = code[:-1]
  88. explain = row.get('entity', {}).get('explain', '')
  89. root = self.get_root_zc(code)
  90. if not root:
  91. root = row.get('entity', {}).get('root', '')
  92. vec_cls = row.get('entity', {}).get('embeddings', [])
  93. sim_res = self.sim(vec, vec_cls)
  94. cls_name = row.get('entity', {}).get('class_name', '')
  95. result_list.append((cls_name, code, sim_res, root, explain))
  96. result_list = sorted(result_list, key=lambda x: x[2], reverse=True)
  97. return result_list
  98. except Exception as e:
  99. print('关系库错误:', e)
  100. return []
  101. def search_industry(self, vec, output_fields: list, topk=7,
  102. industry_list: Optional[List[str]] = None,):
  103. """
  104. 查询统计局分类函数
  105. vec : 标的物转成的向量
  106. industry_list:行业范围 -->list
  107. """
  108. try:
  109. public = True
  110. if industry_list:
  111. # expr = f'industry in {industry_list}'
  112. public = False
  113. else:
  114. expr = None
  115. res = self.col.search([vec], 'embeddings', self.search_params, topk,
  116. output_fields=output_fields,
  117. expr=None)
  118. result_list = []
  119. for hit in res:
  120. for row in hit:
  121. row = row.to_dict()
  122. code = row.get('entity', {}).get('code', '')
  123. cls_name = row.get('entity', {}).get('class_name', '')
  124. if not public:
  125. code = row.get('entity', {}).get('private_code', '')
  126. if code == 'null':
  127. code = row.get('entity', {}).get('code', '')
  128. cls_name = self.get_name(code)
  129. while code[-1] == '0' and code[-2] == '0':
  130. code = code[:-1]
  131. code = code[:-1]
  132. explain = row.get('entity', {}).get('explain', '')
  133. root = self.get_root_zc(code)
  134. if not root:
  135. root = row.get('entity', {}).get('root', '')
  136. vec_cls = row.get('entity', {}).get('embeddings', [])
  137. sim_res = self.sim(vec, vec_cls)
  138. result_list.append((cls_name, code, sim_res, root, explain))
  139. result_list = sorted(result_list, key=lambda x: x[2], reverse=True)
  140. return result_list
  141. except Exception as e:
  142. print('统计局分类错误:', e)
  143. return []
  144. def search_china(self, vec,base=None,topk=7):
  145. """
  146. 查询统计局分类函数
  147. vec : 标的物转成的向量
  148. base:标的物分类
  149. """
  150. try:
  151. res = self.col.search([vec], 'embeddings', self.search_params, topk,
  152. output_fields=['code','class_name','embeddings','explain','root'],
  153. )#expr=f'baseclass=="{base}"'
  154. result_list = []
  155. for hit in res:
  156. # print(hit)
  157. for row in hit:
  158. row = row.to_dict()
  159. code = row.get('entity',{}).get('code','')
  160. while code[-1] == '0' and code[-2] == '0':
  161. code = code[:-1]
  162. code = code[:-1]
  163. ids = row['id']
  164. explain = row.get('entity', {}).get('explain', '')
  165. root = self.get_root_zc(code)
  166. if not root:
  167. root = row.get('entity', {}).get('root', '')
  168. vec_cls = row.get('entity', {}).get('embeddings', [])
  169. sim_res = self.sim(vec, vec_cls)
  170. cls_name = row.get('entity', {}).get('class_name', '')
  171. result_list.append((cls_name, code, sim_res, root, explain))
  172. result_list = sorted(result_list, key=lambda x: x[2], reverse=True)
  173. return result_list
  174. except Exception as e:
  175. print('统计局分类错误:', e)
  176. return []
  177. def update(self, data):
  178. self.col.insert(data)
  179. def create():
  180. onn = connections.connect(db_name="classify", host="192.168.3.109", port=19530)
  181. fields = [
  182. FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
  183. FieldSchema(name="class_name", dtype=DataType.VARCHAR, max_length=100),
  184. FieldSchema(name='code', dtype=DataType.VARCHAR, max_length=16),
  185. FieldSchema(name="p_name", dtype=DataType.VARCHAR, max_length=100),
  186. FieldSchema(name='p_code', dtype=DataType.VARCHAR, max_length=16),
  187. # FieldSchema(name='baseclass', dtype=DataType.VARCHAR, max_length=10),
  188. FieldSchema(name='explain', dtype=DataType.VARCHAR, max_length=500),
  189. # FieldSchema(name='root', dtype=DataType.VARCHAR, max_length=300),
  190. FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=768),
  191. ]
  192. schema = CollectionSchema(fields, description='场景分类')
  193. milvus_conn = Collection('scene', schema=schema)
  194. index = {"index_type": "IVF_FLAT", "metric_type": "L2", "params":{"nlist":128}}
  195. milvus_conn.create_index("embeddings", index)
  196. if __name__ == '__main__':
  197. # create()
  198. # pass
  199. from request_fun import text_to_vector
  200. from config import milvus_config
  201. text = '施工人员派遣'
  202. vector = text_to_vector(text)
  203. col = Milvus('jianyu_code',**milvus_config)
  204. print(col.search_industry(vector, ['code', 'class_name', 'embeddings', 'explain', 'root', 'private_code'],
  205. industry_list=['物业']))