databases.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import bson
  2. import pymongo
  3. from elasticsearch import Elasticsearch
  4. import setting
  5. from utils.title_participle import get_should
  6. # ---------------------------------- mongo ----------------------------------
  7. MONGO_URI_CLIENTS = {} # a dictionary hold all client with uri as key
  8. def mongo_client(cfg=None, host=None, port=None, fork=False, **kwargs):
  9. if host is not None and port is not None:
  10. uri = f'mongodb://{host}:{port}'
  11. else:
  12. _cfg = (cfg or {'host': setting.MONGO_IP, 'port': setting.MONGO_PORT})
  13. uri = f'mongodb://{_cfg["host"]}:{_cfg["port"]}'
  14. if fork:
  15. return pymongo.MongoClient(uri, **kwargs)
  16. global MONGO_URI_CLIENTS
  17. matched_client = MONGO_URI_CLIENTS.get(uri)
  18. if matched_client is None:
  19. new_client = pymongo.MongoClient(uri, **kwargs)
  20. if new_client is not None:
  21. MONGO_URI_CLIENTS[uri] = new_client
  22. return new_client
  23. return matched_client
  24. def mongo_database(name: str, **kwargs):
  25. client = mongo_client(**kwargs)
  26. return client.get_database(name)
  27. def mongo_table(db: str, name: str, **kwargs):
  28. database = mongo_database(db, **kwargs)
  29. return database.get_collection(name)
  30. def int2long(param: int):
  31. """int 转换成 long """
  32. return bson.int64.Int64(param)
  33. def object_id(_id: str):
  34. return bson.objectid.ObjectId(_id)
  35. # ---------------------------------- es ----------------------------------
  36. def es_client(cfg=None):
  37. if cfg is None:
  38. cfg = {
  39. 'host': setting.ES_IP,
  40. 'username': setting.ES_USERNAME,
  41. 'pwd': setting.ES_PASSWORD,
  42. 'port': setting.ES_PORT,
  43. }
  44. hosts = [{'host': cfg['host'], 'port': cfg['port']}]
  45. auth = (cfg['username'], cfg['pwd'])
  46. return Elasticsearch(hosts, http_auth=auth)
  47. def es_query(title: str, publish_time: int):
  48. """
  49. 查询es
  50. :param title: 标题
  51. :param publish_time: 发布时间
  52. :return:
  53. """
  54. client = es_client()
  55. stime = publish_time - 432000 # 往前推5天
  56. etime = publish_time + 432000
  57. time_limit = {"range": {'publishtime': {"from": stime, "to": etime}}}
  58. should_list = get_should(title) # 对标题进行分词组合query语句
  59. # 通过发布标题和发布时间范围查询
  60. query = {
  61. "query": {
  62. "bool": {
  63. "must": [time_limit],
  64. "should": should_list,
  65. "minimum_should_match": "10<90%",
  66. }
  67. }
  68. }
  69. result = client.search(index=setting.ES_INDEX, body=query, request_timeout=100)
  70. total = int(result['hits']['total']['value'])
  71. return total