databases.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import bson
  2. import pymongo
  3. import redis
  4. from elasticsearch import Elasticsearch
  5. from utils.title_participle import get_should
  6. from config.load import mongo_conf, redis_conf, es_conf
  7. # ---------------------------------- mongo ----------------------------------
  8. MONGO_URI_CLIENTS = {} # a dictionary hold all client with uri as key
  9. def mongo_client(cfg=None, host=None, port=None, fork=False, **kwargs):
  10. if host is not None and port is not None:
  11. uri = f'mongodb://{host}:{port}'
  12. else:
  13. _cfg = (cfg or mongo_conf)
  14. uri = f'mongodb://{_cfg["host"]}:{_cfg["port"]}'
  15. if fork:
  16. return pymongo.MongoClient(uri, **kwargs)
  17. global MONGO_URI_CLIENTS
  18. matched_client = MONGO_URI_CLIENTS.get(uri)
  19. if matched_client is None:
  20. new_client = pymongo.MongoClient(uri, **kwargs)
  21. if new_client is not None:
  22. MONGO_URI_CLIENTS[uri] = new_client
  23. return new_client
  24. return matched_client
  25. def mongo_database(name: str, **kw):
  26. client = mongo_client(**kw)
  27. return client.get_database(name)
  28. def mongo_table(db: str, name: str, **kw):
  29. database = mongo_database(db, **kw)
  30. return database.get_collection(name)
  31. def int2long(param: int):
  32. """int 转换成 long """
  33. return bson.int64.Int64(param)
  34. def object_id(_id: str):
  35. return bson.objectid.ObjectId(_id)
  36. # ---------------------------------- es ----------------------------------
  37. def es_client(cfg=None):
  38. if cfg is None:
  39. cfg = es_conf
  40. return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}],http_auth=(cfg['usename'], cfg['pwd']))
  41. def es_query(title: str, publish_time: int):
  42. """
  43. 查询es
  44. :param title: 标题
  45. :param publish_time: 发布时间
  46. :return:
  47. """
  48. client = es_client()
  49. stime = publish_time - 432000 # 往前推5天
  50. etime = publish_time + 432000
  51. time_limit = {"range": {'publishtime': {"from": stime, "to": etime}}}
  52. should_list = get_should(title) # 对标题进行分词组合query语句
  53. # 通过发布标题和发布时间范围查询
  54. query = {
  55. "query": {
  56. "bool": {
  57. "must": [time_limit],
  58. "should": should_list,
  59. "minimum_should_match": "10<90%",
  60. }
  61. }
  62. }
  63. result = client.search(index=es_conf['db'], body=query, request_timeout=100)
  64. total = int(result['hits']['total']['value'])
  65. return total
  66. # ---------------------------------- redis ----------------------------------
  67. def redis_client(cfg=None):
  68. if cfg is None:
  69. cfg = redis_conf
  70. pool = redis.ConnectionPool(
  71. host=cfg['host'],
  72. port=cfg['port'],
  73. password=cfg['pwd'],
  74. db=cfg['db']
  75. )
  76. return redis.Redis(connection_pool=pool, decode_responses=True)