databases.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import bson
  2. import pymongo
  3. import redis
  4. import requests
  5. from elasticsearch import Elasticsearch
  6. from config.load import mongo_conf, redis_conf, es_conf, analyze_url
  7. # ---------------------------------- mongo ----------------------------------
  8. def mongo_client(cfg=None, host=None, port=None):
  9. if cfg is None:
  10. if host is not None and port is not None:
  11. cfg = {'host': host, 'port': port}
  12. else:
  13. cfg = mongo_conf
  14. return pymongo.MongoClient(host=cfg['host'], port=cfg['port'])
  15. def mongo_database(db: str, **kw):
  16. client = mongo_client(**kw)
  17. return client[db]
  18. def mongo_table(db: str, coll: str, **kw):
  19. client = mongo_client(**kw)
  20. return client[db][coll]
  21. def int2long(param: int):
  22. """int 转换成 long """
  23. return bson.int64.Int64(param)
  24. def object_id(_id: str):
  25. return bson.objectid.ObjectId(_id)
  26. # ---------------------------------- es ----------------------------------
  27. def es_client(cfg=None):
  28. if cfg is None:
  29. cfg = es_conf
  30. return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}])
  31. def es_participles_service(text: str):
  32. """
  33. 获取文本的分词列表
  34. :param text: 需要分词的文本
  35. :return: 分词列表
  36. """
  37. result = []
  38. params = {"text": text, "analyzer": "ik_smart"}
  39. res = requests.get(analyze_url, params=params, timeout=60)
  40. if res.status_code == 200:
  41. tokens = res.json().get('tokens', [])
  42. for x in tokens:
  43. if x["token"].encode('utf-8').isalpha():
  44. continue
  45. result.append(x["token"])
  46. return result
  47. def es_query(title: str, publish_time: int):
  48. """
  49. 查询es
  50. :param title: 标题
  51. :param publish_time: 发布时间
  52. :return:
  53. """
  54. client = es_client()
  55. stime = publish_time - 432000 # 往前推5天
  56. etime = publish_time + 432000
  57. conditions = []
  58. participles = es_participles_service(title)
  59. for word in participles:
  60. conditions.append({
  61. "multi_match": {
  62. "query": word,
  63. "type": "phrase",
  64. "fields": ["title"]
  65. }
  66. })
  67. conditions.append({
  68. "range": {"publishtime": {"from": stime, "to": etime}}
  69. })
  70. query = {
  71. "query": {
  72. "bool": {
  73. "must": conditions,
  74. "minimum_should_match": 1
  75. }
  76. }
  77. }
  78. result = client.search(index='bidding', body=query, request_timeout=100)
  79. count = len(result['hits']['hits'])
  80. return count
  81. # ---------------------------------- redis ----------------------------------
  82. def redis_client(cfg=None):
  83. if cfg is None:
  84. cfg = redis_conf
  85. pool = redis.ConnectionPool(
  86. host=cfg['host'],
  87. port=cfg['port'],
  88. password=cfg['pwd'],
  89. db=cfg['db']
  90. )
  91. return redis.Redis(connection_pool=pool, decode_responses=True)