import bson import pymongo import redis from elasticsearch import Elasticsearch from pymongo.uri_parser import SCHEME from rediscluster import RedisCluster from config.load import mongo_conf, redis_conf, es_conf, redis_startup_nodes # ---------------------------------- mongo ---------------------------------- MONGO_URI_CLIENTS = {} # a dictionary hold all client with uri as key def mongo_client(host=None, port=None, fork=False, **kwargs): if host is None: host = mongo_conf["host"] if port is None: port = mongo_conf["port"] if host is not None and host.startswith(SCHEME): uri = host else: uri = f'mongodb://{host}:{port}' if fork: return pymongo.MongoClient(uri, **kwargs) global MONGO_URI_CLIENTS matched_client = MONGO_URI_CLIENTS.get(uri) if matched_client is None: new_client = pymongo.MongoClient(uri, **kwargs) if new_client is not None: MONGO_URI_CLIENTS[uri] = new_client return new_client return matched_client def mongo_database(name: str, **kw): client = mongo_client(**kw) return client.get_database(name) def mongo_table(db: str, name: str, **kw): database = mongo_database(db, **kw) return database.get_collection(name) def int2long(param: int): """int 转换成 long """ return bson.int64.Int64(param) def object_id(_id: str): return bson.objectid.ObjectId(_id) # ---------------------------------- es ---------------------------------- def es_client(cfg=None): if cfg is None: cfg = es_conf return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}]) def es_query(title: str, publish_time: int): """ 查询es :param title: 标题 :param publish_time: 发布时间 :return: """ client = es_client() stime = publish_time - 432000 # 往前推5天 etime = publish_time + 432000 # 通过发布标题和发布时间范围查询 query = { "query": { "bool": { "must": [ { "multi_match": { "query": title, "type": "phrase", "fields": ["title"] } }, {"range": {'publishtime': {"from": stime, "to": etime}}} ] } } } result = client.search(index=es_conf['db'], body=query, request_timeout=100) total = int(result['hits']['total']) return total # ---------------------------------- redis ---------------------------------- def redis_client(cfg=None): if cfg is None: cfg = redis_conf pool = redis.ConnectionPool( host=cfg['host'], port=cfg['port'], password=cfg['pwd'], db=cfg['db'] ) return redis.Redis(connection_pool=pool, decode_responses=True) def redis_cluster(): startup_nodes = redis_startup_nodes return RedisCluster(startup_nodes=startup_nodes, decode_responses=True)