databases.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import bson
  2. import pymongo
  3. import redis
  4. import requests
  5. from elasticsearch import Elasticsearch
  6. from pymongo.collection import Collection
  7. from config.load import mongo_conf, redis_conf, es_conf, analyze_url
  8. from common.log import logger
  9. # ---------------------------------- mongo ----------------------------------
  10. MONGO_URI_CLIENTS = {} # a dictionary hold all client with uri as key
  11. def mongo_client(cfg=None, host=None, port=None, fork=False, **kwargs):
  12. if host is not None and port is not None:
  13. uri = f'mongodb://{host}:{port}'
  14. else:
  15. _cfg = (cfg or mongo_conf)
  16. uri = f'mongodb://{_cfg["host"]}:{_cfg["port"]}'
  17. if fork:
  18. return pymongo.MongoClient(uri, **kwargs)
  19. global MONGO_URI_CLIENTS
  20. matched_client = MONGO_URI_CLIENTS.get(uri)
  21. if matched_client is None:
  22. new_client = pymongo.MongoClient(uri, **kwargs)
  23. if new_client is not None:
  24. MONGO_URI_CLIENTS[uri] = new_client
  25. return new_client
  26. return matched_client
  27. def mongo_database(name: str, **kw):
  28. client = mongo_client(**kw)
  29. return client.get_database(name)
  30. def mongo_table(db: str, name: str, **kw):
  31. database = mongo_database(db, **kw)
  32. return database.get_collection(name)
  33. def int2long(param: int):
  34. """int 转换成 long """
  35. return bson.int64.Int64(param)
  36. def object_id(_id: str):
  37. return bson.objectid.ObjectId(_id)
  38. def update_one(collection: Collection, item):
  39. collection.update_one({'_id': item['_id']}, {'$set': item})
  40. def insert_one(collection: Collection, item):
  41. collection.insert_one(item)
  42. logger.info(f'[入库成功 - {collection.name}]{item["_id"]}')
  43. def insert_many(collection: Collection, items):
  44. for item in items:
  45. insert_one(collection, item)
  46. # ---------------------------------- es ----------------------------------
  47. def es_client(cfg=None):
  48. if cfg is None:
  49. cfg = es_conf
  50. return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}])
  51. def es_participles_service(text: str):
  52. """
  53. 获取文本的分词列表
  54. :param text: 需要分词的文本
  55. :return: 分词列表
  56. """
  57. result = []
  58. params = {"text": text, "analyzer": "ik_smart"}
  59. res = requests.get(analyze_url, params=params, timeout=60)
  60. if res.status_code == 200:
  61. tokens = res.json().get('tokens', [])
  62. for x in tokens:
  63. if x["token"].encode('utf-8').isalpha():
  64. continue
  65. result.append(x["token"])
  66. return result
  67. def es_query(title: str, publish_time: int):
  68. """
  69. 查询es
  70. :param title: 标题
  71. :param publish_time: 发布时间
  72. :return:
  73. """
  74. client = es_client()
  75. stime = publish_time - 432000 # 往前推5天
  76. etime = publish_time + 432000
  77. conditions = []
  78. participles = es_participles_service(title)
  79. for word in participles:
  80. conditions.append({
  81. "multi_match": {
  82. "query": word,
  83. "type": "phrase",
  84. "fields": ["title"]
  85. }
  86. })
  87. conditions.append({
  88. "range": {"publishtime": {"from": stime, "to": etime}}
  89. })
  90. query = {
  91. "query": {
  92. "bool": {
  93. "must": conditions,
  94. "minimum_should_match": 1
  95. }
  96. }
  97. }
  98. result = client.search(index='bidding', body=query, request_timeout=100)
  99. count = len(result['hits']['hits'])
  100. return count
  101. # ---------------------------------- redis ----------------------------------
  102. def redis_client(cfg=None, host=None, port=None, db=None, password=None):
  103. _cfg = (cfg or redis_conf)
  104. host = (host or _cfg['host'])
  105. port = (port or _cfg['port'])
  106. password = (password or _cfg['pwd'])
  107. db = (db or _cfg['db'])
  108. pool = redis.ConnectionPool(
  109. host=host,
  110. port=port,
  111. password=password,
  112. db=db
  113. )
  114. return redis.Redis(connection_pool=pool, decode_responses=True)