text_vector_pulish.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # coding:utf-8
  2. import re
  3. from hashlib import md5
  4. from jy_utils.mongodb_utils import MongoInterface
  5. from jy_utils.task_manage import AsyncTaskScheduler
  6. from jy_utils.tools import add_logger_file
  7. from bson import ObjectId
  8. import threading
  9. from utils.request_fun import text_to_vector
  10. from utils.title_ner import title_topic_process
  11. add_logger_file("./logs_vector")
  12. SAVE_MAX_id = 'save_id_max'
  13. MongoConfig = {
  14. "ip_port": "172.17.189.140:27080",
  15. # "ip_port": "192.168.3.71:29099",
  16. "user": "JSYJZ_RWBidAi_ProG",
  17. "password": "JSLi@20LiefK3d",
  18. "db": "qfw",
  19. # "db": "re4art",
  20. "col": "bidding",
  21. }
  22. MongoConfig2 = {
  23. "ip_port": "172.17.189.140:27080",
  24. # "ip_port": "192.168.3.71:29099",
  25. "user": "JSYJZ_RWBidAi_ProG",
  26. "password": "JSLi@20LiefK3d",
  27. "db": "ai",
  28. # "db": "re4art",
  29. "col": "vector_file",
  30. }
  31. AsyncConfig = {
  32. "max_queue_size": 5000,
  33. "producer_interval": 10,
  34. "consumer_interval": 2,
  35. "run_status": True,
  36. }
  37. at = AsyncTaskScheduler(AsyncConfig)
  38. mg = MongoInterface(MongoConfig)
  39. mg2 = MongoInterface(MongoConfig2)
  40. with open('./data/stoptext.txt', 'r', encoding='utf-8') as f:
  41. stopcontent = f.readlines()
  42. def producer_handle(data):
  43. if data.get('toptype','') in ['拟建','产权']: # 排除拟建,产权类
  44. return False, data
  45. data = data
  46. return True, data # True 代表入队列,data 代表入队列的数据
  47. def stop_content(text: str):
  48. """
  49. 停用文本--->当一些固定的词需要切除但是可能会被切词工具切错如:重采购,重招标
  50. :param text:
  51. :return:
  52. """
  53. for sw in stopcontent:
  54. sw = sw.replace('\n', '')
  55. if sw in text:
  56. text = text.replace(sw, '')
  57. return text
  58. def re_tract(title):
  59. """
  60. 标题正则,加速抽取
  61. :param title:
  62. :return:
  63. """
  64. patterns = ['.*关于(.*?)的网上超市.*']
  65. for pattern in patterns:
  66. text = [i for i in re.findall(pattern, title) if i]
  67. if text:
  68. return ''.join(text)
  69. def topic_trace(title,projectname):
  70. """
  71. 主干词抽取
  72. """
  73. if '采购意向' in projectname and '采购意向' in title:
  74. return title,'title'
  75. title_topic = re_tract(title)
  76. if title_topic:
  77. return title_topic,'re'
  78. title_topic = re_tract(projectname)
  79. if title_topic:
  80. return title_topic,'re'
  81. if ('采购意向' in title or '...' in title) and '采购意向' not in projectname:
  82. title_topic, flag = title_topic_process(stop_content(projectname))
  83. else:
  84. title_topic, flag = title_topic_process(stop_content(title))
  85. if flag == 'title' and projectname:
  86. title_topic, flag = title_topic_process(stop_content(projectname))
  87. if not title_topic:
  88. title_topic = title
  89. flag = 'title'
  90. title_topic = re.sub(r'[^\w\s]', '', title_topic)
  91. return title_topic,flag
  92. @at.many_thread(num=2)
  93. @at.consumer
  94. def consumer_handle(*args, **kwargs):
  95. '''
  96. 处理逻辑
  97. :param data:
  98. :return:
  99. '''
  100. #
  101. row = kwargs.get("data")
  102. ids = row.get('_id', '')
  103. projectname = row.get('projectname', '')
  104. title = row.get('title', '')
  105. title_topic, flag = topic_trace(title, projectname) # 主干抽取
  106. title_topic = title_topic.replace('"', '').replace('\\', '')
  107. mg.update_one_by_field(MongoConfig.get('col', ''), {'_id': ids}, {'topic_test': title_topic}) # 主干词入bidding
  108. if flag != 'title':
  109. topic_hash = md5(title_topic.encode('utf-8')).hexdigest()
  110. vector = text_to_vector(title_topic)
  111. mg2.update_one_by_field(MongoConfig2.get('col', ''),
  112. {'hash_id':topic_hash},{'topic_name':title_topic,
  113. 'hash_id': topic_hash,
  114. 'vector': str(vector)}, True) # 向量存入向量表,hash_id 不存在则插入新id
  115. if __name__ == '__main__':
  116. filed = ['_id', 'detail', 'title', 'projectname','toptype','topic_word',
  117. 'rate','tag_subinformation_ai','tag_topinformation_ai']
  118. incremental_iterator = mg.bigdata_iterator(MongoConfig["col"], filed,
  119. id_range=[ObjectId('0'*24), ObjectId("f"*24 )]
  120. ,reverse=True)
  121. t = threading.Thread(target=at.producer, args=(incremental_iterator, producer_handle)) # 生产者
  122. t.start()
  123. consumer_handle() # 消费者
  124. t.join()