databases.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import bson
  2. import pymongo
  3. import redis
  4. from elasticsearch import Elasticsearch
  5. from config.load import mongo_conf, redis_conf, es_conf
  6. # ---------------------------------- mongo ----------------------------------
  7. MONGO_URI_CLIENTS = {} # a dictionary hold all client with uri as key
  8. def mongo_client(cfg=None, host=None, port=None, fork=False, **kwargs):
  9. if host is not None and port is not None:
  10. uri = f'mongodb://{host}:{port}'
  11. else:
  12. _cfg = (cfg or mongo_conf)
  13. uri = f'mongodb://{_cfg["host"]}:{_cfg["port"]}'
  14. if fork:
  15. return pymongo.MongoClient(uri, **kwargs)
  16. global MONGO_URI_CLIENTS
  17. matched_client = MONGO_URI_CLIENTS.get(uri)
  18. if matched_client is None:
  19. new_client = pymongo.MongoClient(uri, **kwargs)
  20. if new_client is not None:
  21. MONGO_URI_CLIENTS[uri] = new_client
  22. return new_client
  23. return matched_client
  24. def mongo_database(name: str, **kw):
  25. client = mongo_client(**kw)
  26. return client.get_database(name)
  27. def mongo_table(db: str, name: str, **kw):
  28. database = mongo_database(db, **kw)
  29. return database.get_collection(name)
  30. def int2long(param: int):
  31. """int 转换成 long """
  32. return bson.int64.Int64(param)
  33. def object_id(_id: str):
  34. return bson.objectid.ObjectId(_id)
  35. # ---------------------------------- es ----------------------------------
  36. def es_client(cfg=None):
  37. if cfg is None:
  38. cfg = es_conf
  39. return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}])
  40. def es_query(title: str, publish_time: int):
  41. """
  42. 查询es
  43. :param title: 标题
  44. :param publish_time: 发布时间
  45. :return:
  46. """
  47. client = es_client()
  48. stime = publish_time - 432000 # 往前推5天
  49. etime = publish_time + 432000
  50. # 通过发布标题和发布时间范围查询
  51. query = {
  52. "query": {
  53. "bool": {
  54. "must": [
  55. {
  56. "multi_match": {
  57. "query": title,
  58. "type": "phrase",
  59. "fields": ["title","detail"]
  60. }
  61. },
  62. {"range": {'publishtime': {"from": stime, "to": etime}}}
  63. ]
  64. }
  65. }
  66. }
  67. result = client.search(index=es_conf['db'], body=query, request_timeout=100)
  68. total = int(result['hits']['total']['value'])
  69. return total
  70. # ---------------------------------- redis ----------------------------------
  71. def redis_client(cfg=None):
  72. if cfg is None:
  73. cfg = redis_conf
  74. pool = redis.ConnectionPool(
  75. host=cfg['host'],
  76. port=cfg['port'],
  77. password=cfg['pwd'],
  78. db=cfg['db']
  79. )
  80. return redis.Redis(connection_pool=pool, decode_responses=True)