123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- import bson
- import pymongo
- import redis
- import requests
- from elasticsearch import Elasticsearch
- from pymongo.collection import Collection
- from pymongo.uri_parser import SCHEME
- from config.load import mongo_conf, redis_conf, es_conf, analyze_url
- # ---------------------------------- mongo ----------------------------------
- MONGO_URI_CLIENTS = {} # a dictionary hold all client with uri as key
- def mongo_client(host=None, port=None, fork=False, **kwargs):
- if host is None:
- host = mongo_conf["host"]
- if port is None:
- port = mongo_conf["port"]
- if host is not None and host.startswith(SCHEME):
- uri = host
- else:
- uri = f'mongodb://{host}:{port}'
- if fork:
- return pymongo.MongoClient(uri, **kwargs)
- global MONGO_URI_CLIENTS
- matched_client = MONGO_URI_CLIENTS.get(uri)
- if matched_client is None:
- new_client = pymongo.MongoClient(uri, **kwargs)
- if new_client is not None:
- MONGO_URI_CLIENTS[uri] = new_client
- return new_client
- return matched_client
- def mongo_database(name: str, **kw):
- client = mongo_client(**kw)
- return client.get_database(name)
- def mongo_table(db: str, name: str, **kw):
- database = mongo_database(db, **kw)
- return database.get_collection(name)
- def int2long(param: int):
- """int 转换成 long """
- return bson.int64.Int64(param)
- def object_id(_id: str):
- return bson.objectid.ObjectId(_id)
- def update_one(collection: Collection, item):
- collection.update_one({'_id': item['_id']}, {'$set': item})
- def update_one_by_domain(collection: Collection, item):
- lst = []
- res = collection.find_one({'domain': item['domain']})
- if res is not None:
- item['update_at'] = item.pop('create_at')
- if item['origin'] not in res['origin']:
- lst.append(item['origin']) # 添加挖掘新成员
- lst.extend(res['origin']) # 合并已收录旧成员
- else:
- lst = res['origin']
- else:
- lst.append(item['origin'])
- item['update_at'] = item['create_at']
- item.update({'origin': lst})
- collection.update_one(
- {'domain': item['domain']},
- {'$set': item},
- upsert=True
- )
- def insert_one(collection: Collection, item):
- collection.insert_one(item)
- def insert_many(collection: Collection, items):
- for item in items:
- insert_one(collection, item)
- # ---------------------------------- es ----------------------------------
- def es_client(cfg=None):
- if cfg is None:
- cfg = es_conf
- return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}])
- def es_participles_service(text: str):
- """
- 获取文本的分词列表
- :param text: 需要分词的文本
- :return: 分词列表
- """
- result = []
- params = {"text": text, "analyzer": "ik_smart"}
- res = requests.get(analyze_url, params=params, timeout=60)
- if res.status_code == 200:
- tokens = res.json().get('tokens', [])
- for x in tokens:
- if x["token"].encode('utf-8').isalpha():
- continue
- result.append(x["token"])
- return result
- def es_query(title: str, publish_time: int):
- """
- 查询es
- :param title: 标题
- :param publish_time: 发布时间
- :return:
- """
- client = es_client()
- stime = publish_time - 432000 # 往前推5天
- etime = publish_time + 432000
- conditions = []
- participles = es_participles_service(title)
- for word in participles:
- conditions.append({
- "multi_match": {
- "query": word,
- "type": "phrase",
- "fields": ["title"]
- }
- })
- conditions.append({"range": {"publishtime": {"from": stime, "to": etime}}})
- query = {
- "query": {
- "bool": {
- "must": conditions,
- "minimum_should_match": 1
- }
- }
- }
- result = client.search(index=es_conf['db'], body=query, request_timeout=100)
- count = len(result['hits']['hits'])
- return count
- # ---------------------------------- redis ----------------------------------
- def redis_client(cfg=None, host=None, port=None, db=None, password=None):
- _cfg = (cfg or redis_conf)
- host = (host or _cfg['host'])
- port = (port or _cfg['port'])
- password = (password or _cfg['pwd'])
- db = (db or _cfg['db'])
- return redis.StrictRedis(host=host,
- port=port,
- password=password,
- db=db, decode_responses=True)
|