databases.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import bson
  2. import pymongo
  3. import redis
  4. import requests
  5. from elasticsearch import Elasticsearch
  6. from config.load import mongo_conf, redis_conf, es_conf, analyze_url
  7. # ---------------------------------- mongo ----------------------------------
  8. MONGO_URI_CLIENTS = {} # a dictionary hold all client with uri as key
  9. def mongo_client(cfg=None, host=None, port=None, fork=False, **kwargs):
  10. if host is not None and port is not None:
  11. uri = f'mongodb://{host}:{port}'
  12. else:
  13. _cfg = (cfg or mongo_conf)
  14. uri = f'mongodb://{_cfg["host"]}:{_cfg["port"]}'
  15. if fork:
  16. return pymongo.MongoClient(uri, **kwargs)
  17. global MONGO_URI_CLIENTS
  18. matched_client = MONGO_URI_CLIENTS.get(uri)
  19. if matched_client is None:
  20. new_client = pymongo.MongoClient(uri, **kwargs)
  21. if new_client is not None:
  22. MONGO_URI_CLIENTS[uri] = new_client
  23. return new_client
  24. return matched_client
  25. def mongo_database(name: str, **kw):
  26. client = mongo_client(**kw)
  27. return client.get_database(name)
  28. def mongo_table(db: str, name: str, **kw):
  29. database = mongo_database(db, **kw)
  30. return database.get_collection(name)
  31. def int2long(param: int):
  32. """int 转换成 long """
  33. return bson.int64.Int64(param)
  34. def object_id(_id: str):
  35. return bson.objectid.ObjectId(_id)
  36. # ---------------------------------- es ----------------------------------
  37. def es_client(cfg=None):
  38. if cfg is None:
  39. cfg = es_conf
  40. return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}])
  41. def es_participles_service(text: str):
  42. """
  43. 获取文本的分词列表
  44. :param text: 需要分词的文本
  45. :return: 分词列表
  46. """
  47. result = []
  48. params = {"text": text, "analyzer": "ik_smart"}
  49. res = requests.get(analyze_url, params=params, timeout=60)
  50. if res.status_code == 200:
  51. tokens = res.json().get('tokens', [])
  52. for x in tokens:
  53. if x["token"].encode('utf-8').isalpha():
  54. continue
  55. result.append(x["token"])
  56. return result
  57. def es_query(title: str, publish_time: int):
  58. """
  59. 查询es
  60. :param title: 标题
  61. :param publish_time: 发布时间
  62. :return:
  63. """
  64. client = es_client()
  65. stime = publish_time - 432000 # 往前推5天
  66. etime = publish_time + 432000
  67. conditions = []
  68. participles = es_participles_service(title)
  69. for word in participles:
  70. conditions.append({
  71. "multi_match": {
  72. "query": word,
  73. "type": "phrase",
  74. "fields": ["title"]
  75. }
  76. })
  77. conditions.append({
  78. "range": {"publishtime": {"from": stime, "to": etime}}
  79. })
  80. query = {
  81. "query": {
  82. "bool": {
  83. "must": conditions,
  84. "minimum_should_match": 1
  85. }
  86. }
  87. }
  88. result = client.search(index='bidding', body=query, request_timeout=100)
  89. count = len(result['hits']['hits'])
  90. return count
  91. # ---------------------------------- redis ----------------------------------
  92. def redis_client(cfg=None, host=None, port=None, db=None, password=None):
  93. if all([host is not None, port is not None, db is not None]):
  94. host, port, pwd, db = host, port, password, db
  95. else:
  96. _cfg = (cfg or redis_conf)
  97. host, port, pwd, db = _cfg['host'], _cfg['port'], _cfg['pwd'], _cfg['db']
  98. pool = redis.ConnectionPool(
  99. host=host,
  100. port=port,
  101. password=pwd,
  102. db=db
  103. )
  104. return redis.Redis(connection_pool=pool, decode_responses=True)