123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- # coding:utf-8
- import re
- from hashlib import md5
- from jy_utils.mongodb_utils import MongoInterface
- from jy_utils.task_manage import AsyncTaskScheduler
- from jy_utils.tools import add_logger_file
- from bson import ObjectId
- import threading
- from utils.request_fun import text_to_vector
- from utils.title_ner import title_topic_process
- add_logger_file("./logs_vector")
- SAVE_MAX_id = 'save_id_max'
- MongoConfig = {
- "ip_port": "172.17.189.140:27080",
- # "ip_port": "192.168.3.71:29099",
- "user": "JSYJZ_RWBidAi_ProG",
- "password": "JSLi@20LiefK3d",
- "db": "qfw",
- # "db": "re4art",
- "col": "bidding",
- }
- MongoConfig2 = {
- "ip_port": "172.17.189.140:27080",
- # "ip_port": "192.168.3.71:29099",
- "user": "JSYJZ_RWBidAi_ProG",
- "password": "JSLi@20LiefK3d",
- "db": "ai",
- # "db": "re4art",
- "col": "vector_file",
- }
- AsyncConfig = {
- "max_queue_size": 5000,
- "producer_interval": 10,
- "consumer_interval": 2,
- "run_status": True,
- }
- at = AsyncTaskScheduler(AsyncConfig)
- mg = MongoInterface(MongoConfig)
- mg2 = MongoInterface(MongoConfig2)
- with open('./data/stoptext.txt', 'r', encoding='utf-8') as f:
- stopcontent = f.readlines()
- def producer_handle(data):
- if data.get('toptype','') in ['拟建','产权']: # 排除拟建,产权类
- return False, data
- data = data
- return True, data # True 代表入队列,data 代表入队列的数据
- def stop_content(text: str):
- """
- 停用文本--->当一些固定的词需要切除但是可能会被切词工具切错如:重采购,重招标
- :param text:
- :return:
- """
- for sw in stopcontent:
- sw = sw.replace('\n', '')
- if sw in text:
- text = text.replace(sw, '')
- return text
- def re_tract(title):
- """
- 标题正则,加速抽取
- :param title:
- :return:
- """
- patterns = ['.*关于(.*?)的网上超市.*']
- for pattern in patterns:
- text = [i for i in re.findall(pattern, title) if i]
- if text:
- return ''.join(text)
- def topic_trace(title,projectname):
- """
- 主干词抽取
- """
- if '采购意向' in projectname and '采购意向' in title:
- return title,'title'
- title_topic = re_tract(title)
- if title_topic:
- return title_topic,'re'
- title_topic = re_tract(projectname)
- if title_topic:
- return title_topic,'re'
- if ('采购意向' in title or '...' in title) and '采购意向' not in projectname:
- title_topic, flag = title_topic_process(stop_content(projectname))
- else:
- title_topic, flag = title_topic_process(stop_content(title))
- if flag == 'title' and projectname:
- title_topic, flag = title_topic_process(stop_content(projectname))
- if not title_topic:
- title_topic = title
- flag = 'title'
- title_topic = re.sub(r'[^\w\s]', '', title_topic)
- return title_topic,flag
- @at.many_thread(num=2)
- @at.consumer
- def consumer_handle(*args, **kwargs):
- '''
- 处理逻辑
- :param data:
- :return:
- '''
- #
- row = kwargs.get("data")
- ids = row.get('_id', '')
- projectname = row.get('projectname', '')
- title = row.get('title', '')
- title_topic, flag = topic_trace(title, projectname) # 主干抽取
- title_topic = title_topic.replace('"', '').replace('\\', '')
- mg.update_one_by_field(MongoConfig.get('col', ''), {'_id': ids}, {'topic_test': title_topic}) # 主干词入bidding
- if flag != 'title':
- topic_hash = md5(title_topic.encode('utf-8')).hexdigest()
- vector = text_to_vector(title_topic)
- mg2.update_one_by_field(MongoConfig2.get('col', ''),
- {'hash_id':topic_hash},{'topic_name':title_topic,
- 'hash_id': topic_hash,
- 'vector': str(vector)}, True) # 向量存入向量表,hash_id 不存在则插入新id
- if __name__ == '__main__':
- filed = ['_id', 'detail', 'title', 'projectname','toptype','topic_word',
- 'rate','tag_subinformation_ai','tag_topinformation_ai']
- incremental_iterator = mg.bigdata_iterator(MongoConfig["col"], filed,
- id_range=[ObjectId('0'*24), ObjectId("f"*24 )]
- ,reverse=True)
- t = threading.Thread(target=at.producer, args=(incremental_iterator, producer_handle)) # 生产者
- t.start()
- consumer_handle() # 消费者
- t.join()
|