import bson import pymongo import redis import requests from elasticsearch import Elasticsearch from config.load import mongo_conf, redis_conf, es_conf, analyze_url # ---------------------------------- mongo ---------------------------------- def mongo_client(cfg=None): if cfg is None: cfg = mongo_conf return pymongo.MongoClient(host=cfg['host'], port=cfg['port']) def mongo_database(db: str): client = mongo_client() return client[db] def mongo_table(db: str, coll: str): client = mongo_client() return client[db][coll] def int2long(param: int): """int 转换成 long """ return bson.int64.Int64(param) def object_id(_id: str): return bson.objectid.ObjectId(_id) # ---------------------------------- es ---------------------------------- def es_client(cfg=None): if cfg is None: cfg = es_conf return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}]) def es_participles_service(text: str): """ 获取文本的分词列表 :param text: 需要分词的文本 :return: 分词列表 """ result = [] params = {"text": text, "analyzer": "ik_smart"} res = requests.get(analyze_url, params=params, timeout=60) if res.status_code == 200: tokens = res.json().get('tokens', []) for x in tokens: if x["token"].encode('utf-8').isalpha(): continue result.append(x["token"]) return result def es_query(title: str, publish_time: int): """ 查询es :param title: 标题 :param publish_time: 发布时间 :return: """ client = es_client() stime = publish_time - 432000 # 往前推5天 etime = publish_time + 432000 conditions = [] participles = es_participles_service(title) for word in participles: conditions.append({ "multi_match": { "query": word, "type": "phrase", "fields": ["title"] } }) conditions.append({ "range": {"publishtime": {"from": stime, "to": etime}} }) query = { "query": { "bool": { "must": conditions, "minimum_should_match": 1 } } } result = client.search(index=es_conf['db'], body=query, request_timeout=100) count = len(result['hits']['hits']) return count # ---------------------------------- redis ---------------------------------- def redis_client(cfg=None): if cfg is None: cfg = redis_conf pool = redis.ConnectionPool( host=cfg['host'], port=cfg['port'], password=cfg['pwd'], db=cfg['db'] ) return redis.Redis(connection_pool=pool, decode_responses=True)