# coding:utf-8 import re from hashlib import md5 from jy_utils.mongodb_utils import MongoInterface from jy_utils.task_manage import AsyncTaskScheduler from jy_utils.tools import add_logger_file from bson import ObjectId import threading from utils.request_fun import text_to_vector from utils.title_ner import title_topic_process add_logger_file("./logs_vector") SAVE_MAX_id = 'save_id_max' MongoConfig = { "ip_port": "172.17.189.140:27080", # "ip_port": "192.168.3.71:29099", "user": "JSYJZ_RWBidAi_ProG", "password": "JSLi@20LiefK3d", "db": "qfw", # "db": "re4art", "col": "bidding", } MongoConfig2 = { "ip_port": "172.17.189.140:27080", # "ip_port": "192.168.3.71:29099", "user": "JSYJZ_RWBidAi_ProG", "password": "JSLi@20LiefK3d", "db": "ai", # "db": "re4art", "col": "vector_file", } AsyncConfig = { "max_queue_size": 5000, "producer_interval": 10, "consumer_interval": 2, "run_status": True, } at = AsyncTaskScheduler(AsyncConfig) mg = MongoInterface(MongoConfig) mg2 = MongoInterface(MongoConfig2) with open('./data/stoptext.txt', 'r', encoding='utf-8') as f: stopcontent = f.readlines() def producer_handle(data): if data.get('toptype','') in ['拟建','产权']: # 排除拟建,产权类 return False, data data = data return True, data # True 代表入队列,data 代表入队列的数据 def stop_content(text: str): """ 停用文本--->当一些固定的词需要切除但是可能会被切词工具切错如:重采购,重招标 :param text: :return: """ for sw in stopcontent: sw = sw.replace('\n', '') if sw in text: text = text.replace(sw, '') return text def re_tract(title): """ 标题正则,加速抽取 :param title: :return: """ patterns = ['.*关于(.*?)的网上超市.*'] for pattern in patterns: text = [i for i in re.findall(pattern, title) if i] if text: return ''.join(text) def topic_trace(title,projectname): """ 主干词抽取 """ if '采购意向' in projectname and '采购意向' in title: return title,'title' title_topic = re_tract(title) if title_topic: return title_topic,'re' title_topic = re_tract(projectname) if title_topic: return title_topic,'re' if ('采购意向' in title or '...' in title) and '采购意向' not in projectname: title_topic, flag = title_topic_process(stop_content(projectname)) else: title_topic, flag = title_topic_process(stop_content(title)) if flag == 'title' and projectname: title_topic, flag = title_topic_process(stop_content(projectname)) if not title_topic: title_topic = title flag = 'title' title_topic = re.sub(r'[^\w\s]', '', title_topic) return title_topic,flag @at.many_thread(num=2) @at.consumer def consumer_handle(*args, **kwargs): ''' 处理逻辑 :param data: :return: ''' # row = kwargs.get("data") ids = row.get('_id', '') projectname = row.get('projectname', '') title = row.get('title', '') title_topic, flag = topic_trace(title, projectname) # 主干抽取 title_topic = title_topic.replace('"', '').replace('\\', '') mg.update_one_by_field(MongoConfig.get('col', ''), {'_id': ids}, {'topic_test': title_topic}) # 主干词入bidding if flag != 'title': topic_hash = md5(title_topic.encode('utf-8')).hexdigest() vector = text_to_vector(title_topic) mg2.update_one_by_field(MongoConfig2.get('col', ''), {'hash_id':topic_hash},{'topic_name':title_topic, 'hash_id': topic_hash, 'vector': str(vector)}, True) # 向量存入向量表,hash_id 不存在则插入新id if __name__ == '__main__': filed = ['_id', 'detail', 'title', 'projectname','toptype','topic_word', 'rate','tag_subinformation_ai','tag_topinformation_ai'] incremental_iterator = mg.bigdata_iterator(MongoConfig["col"], filed, id_range=[ObjectId('0'*24), ObjectId("f"*24 )] ,reverse=True) t = threading.Thread(target=at.producer, args=(incremental_iterator, producer_handle)) # 生产者 t.start() consumer_handle() # 消费者 t.join()