import bson import pymongo import redis import requests from elasticsearch import Elasticsearch from pymongo.collection import Collection from pymongo.uri_parser import SCHEME from config.load import mongo_conf, redis_conf, es_conf, analyze_url # ---------------------------------- mongo ---------------------------------- MONGO_URI_CLIENTS = {} # a dictionary hold all client with uri as key def mongo_client(host=None, port=None, fork=False, **kwargs): if host is None: host = mongo_conf["host"] if port is None: port = mongo_conf["port"] if host is not None and host.startswith(SCHEME): uri = host else: uri = f'mongodb://{host}:{port}' if fork: return pymongo.MongoClient(uri, **kwargs) global MONGO_URI_CLIENTS matched_client = MONGO_URI_CLIENTS.get(uri) if matched_client is None: new_client = pymongo.MongoClient(uri, **kwargs) if new_client is not None: MONGO_URI_CLIENTS[uri] = new_client return new_client return matched_client def mongo_database(name: str, **kw): client = mongo_client(**kw) return client.get_database(name) def mongo_table(db: str, name: str, **kw): database = mongo_database(db, **kw) return database.get_collection(name) def int2long(param: int): """int 转换成 long """ return bson.int64.Int64(param) def object_id(_id: str): return bson.objectid.ObjectId(_id) def update_one(collection: Collection, item): collection.update_one({'_id': item['_id']}, {'$set': item}) def update_one_by_domain(collection: Collection, item): lst = [] res = collection.find_one({'domain': item['domain']}) if res is not None and item['origin'] not in res['origin']: lst.append(item['origin']) # 添加挖掘新成员 lst.extend(res['origin']) # 合并已收录旧成员 else: lst.append(item['origin']) item.update({'origin': lst}) collection.update_one( {'domain': item['domain']}, {'$set': item}, upsert=True ) def insert_one(collection: Collection, item): collection.insert_one(item) def insert_many(collection: Collection, items): for item in items: insert_one(collection, item) # ---------------------------------- es ---------------------------------- def es_client(cfg=None): if cfg is None: cfg = es_conf return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}]) def es_participles_service(text: str): """ 获取文本的分词列表 :param text: 需要分词的文本 :return: 分词列表 """ result = [] params = {"text": text, "analyzer": "ik_smart"} res = requests.get(analyze_url, params=params, timeout=60) if res.status_code == 200: tokens = res.json().get('tokens', []) for x in tokens: if x["token"].encode('utf-8').isalpha(): continue result.append(x["token"]) return result def es_query(title: str, publish_time: int): """ 查询es :param title: 标题 :param publish_time: 发布时间 :return: """ client = es_client() stime = publish_time - 432000 # 往前推5天 etime = publish_time + 432000 conditions = [] participles = es_participles_service(title) for word in participles: conditions.append({ "multi_match": { "query": word, "type": "phrase", "fields": ["title"] } }) conditions.append({"range": {"publishtime": {"from": stime, "to": etime}}}) query = { "query": { "bool": { "must": conditions, "minimum_should_match": 1 } } } result = client.search(index=es_conf['db'], body=query, request_timeout=100) count = len(result['hits']['hits']) return count # ---------------------------------- redis ---------------------------------- def redis_client(cfg=None, host=None, port=None, db=None, password=None): _cfg = (cfg or redis_conf) host = (host or _cfg['host']) port = (port or _cfg['port']) password = (password or _cfg['pwd']) db = (db or _cfg['db']) return redis.StrictRedis(host=host, port=port, password=password, db=db, decode_responses=True)