databases.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import bson
  2. import pymongo
  3. import redis
  4. from elasticsearch import Elasticsearch
  5. from pymongo.uri_parser import SCHEME
  6. from rediscluster import RedisCluster
  7. from config.load import mongo_conf, redis_conf, es_conf, redis_startup_nodes
  8. # ---------------------------------- mongo ----------------------------------
  9. MONGO_URI_CLIENTS = {} # a dictionary hold all client with uri as key
  10. def mongo_client(host=None, port=None, fork=False, **kwargs):
  11. if host is None:
  12. host = mongo_conf["host"]
  13. if port is None:
  14. port = mongo_conf["port"]
  15. if host is not None and host.startswith(SCHEME):
  16. uri = host
  17. else:
  18. uri = f'mongodb://{host}:{port}'
  19. if fork:
  20. return pymongo.MongoClient(uri, **kwargs)
  21. global MONGO_URI_CLIENTS
  22. matched_client = MONGO_URI_CLIENTS.get(uri)
  23. if matched_client is None:
  24. new_client = pymongo.MongoClient(uri, **kwargs)
  25. if new_client is not None:
  26. MONGO_URI_CLIENTS[uri] = new_client
  27. return new_client
  28. return matched_client
  29. def mongo_database(name: str, **kw):
  30. client = mongo_client(**kw)
  31. return client.get_database(name)
  32. def mongo_table(db: str, name: str, **kw):
  33. database = mongo_database(db, **kw)
  34. return database.get_collection(name)
  35. def int2long(param: int):
  36. """int 转换成 long """
  37. return bson.int64.Int64(param)
  38. def object_id(_id: str):
  39. return bson.objectid.ObjectId(_id)
  40. # ---------------------------------- es ----------------------------------
  41. def es_client(cfg=None):
  42. if cfg is None:
  43. cfg = es_conf
  44. return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}])
  45. def es_query(title: str, publish_time: int):
  46. """
  47. 查询es
  48. :param title: 标题
  49. :param publish_time: 发布时间
  50. :return:
  51. """
  52. client = es_client()
  53. stime = publish_time - 432000 # 往前推5天
  54. etime = publish_time + 432000
  55. # 通过发布标题和发布时间范围查询
  56. query = {
  57. "query": {
  58. "bool": {
  59. "must": [
  60. {
  61. "multi_match": {
  62. "query": title,
  63. "type": "phrase",
  64. "fields": ["title"]
  65. }
  66. },
  67. {"range": {'publishtime': {"from": stime, "to": etime}}}
  68. ]
  69. }
  70. }
  71. }
  72. result = client.search(index=es_conf['db'], body=query, request_timeout=100)
  73. total = int(result['hits']['total'])
  74. return total
  75. # ---------------------------------- redis ----------------------------------
  76. def redis_client(cfg=None):
  77. if cfg is None:
  78. cfg = redis_conf
  79. pool = redis.ConnectionPool(
  80. host=cfg['host'],
  81. port=cfg['port'],
  82. password=cfg['pwd'],
  83. db=cfg['db']
  84. )
  85. return redis.Redis(connection_pool=pool, decode_responses=True)
  86. def redis_cluster():
  87. startup_nodes = redis_startup_nodes
  88. return RedisCluster(startup_nodes=startup_nodes, decode_responses=True)