databases.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import bson
  2. import pymongo
  3. import redis
  4. import requests
  5. from elasticsearch import Elasticsearch
  6. from pymongo.collection import Collection
  7. from config.load import mongo_conf, redis_conf, es_conf, analyze_url
  8. # ---------------------------------- mongo ----------------------------------
  9. MONGO_URI_CLIENTS = {} # a dictionary hold all client with uri as key
  10. def mongo_client(cfg=None, host=None, port=None, fork=False, **kwargs):
  11. if host is not None and port is not None:
  12. uri = f'mongodb://{host}:{port}'
  13. else:
  14. _cfg = (cfg or mongo_conf)
  15. uri = f'mongodb://{_cfg["host"]}:{_cfg["port"]}'
  16. if fork:
  17. return pymongo.MongoClient(uri, **kwargs)
  18. global MONGO_URI_CLIENTS
  19. matched_client = MONGO_URI_CLIENTS.get(uri)
  20. if matched_client is None:
  21. new_client = pymongo.MongoClient(uri, **kwargs)
  22. if new_client is not None:
  23. MONGO_URI_CLIENTS[uri] = new_client
  24. return new_client
  25. return matched_client
  26. def mongo_database(name: str, **kw):
  27. client = mongo_client(**kw)
  28. return client.get_database(name)
  29. def mongo_table(db: str, name: str, **kw):
  30. database = mongo_database(db, **kw)
  31. return database.get_collection(name)
  32. def int2long(param: int):
  33. """int 转换成 long """
  34. return bson.int64.Int64(param)
  35. def object_id(_id: str):
  36. return bson.objectid.ObjectId(_id)
  37. def update_one(collection: Collection, item):
  38. collection.update_one({'_id': item['_id']}, {'$set': item})
  39. def update_one_by_domain(collection: Collection, item):
  40. lst = []
  41. items = collection.find_one({'domain': item['domain']})
  42. if items is not None:
  43. origin = items.get('origin', [])
  44. if item['origin'] not in origin:
  45. lst.append(item['origin'])
  46. lst.extend(origin)
  47. else:
  48. lst.append(item['origin'])
  49. item.update({'origin': lst})
  50. collection.update_one(
  51. {'domain': item['domain']},
  52. {'$set': item},
  53. upsert=True
  54. )
  55. def insert_one(collection: Collection, item):
  56. collection.insert_one(item)
  57. def insert_many(collection: Collection, items):
  58. for item in items:
  59. insert_one(collection, item)
  60. # ---------------------------------- es ----------------------------------
  61. def es_client(cfg=None):
  62. if cfg is None:
  63. cfg = es_conf
  64. return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}])
  65. def es_participles_service(text: str):
  66. """
  67. 获取文本的分词列表
  68. :param text: 需要分词的文本
  69. :return: 分词列表
  70. """
  71. result = []
  72. params = {"text": text, "analyzer": "ik_smart"}
  73. res = requests.get(analyze_url, params=params, timeout=60)
  74. if res.status_code == 200:
  75. tokens = res.json().get('tokens', [])
  76. for x in tokens:
  77. if x["token"].encode('utf-8').isalpha():
  78. continue
  79. result.append(x["token"])
  80. return result
  81. def es_query(title: str, publish_time: int):
  82. """
  83. 查询es
  84. :param title: 标题
  85. :param publish_time: 发布时间
  86. :return:
  87. """
  88. client = es_client()
  89. stime = publish_time - 432000 # 往前推5天
  90. etime = publish_time + 432000
  91. conditions = []
  92. participles = es_participles_service(title)
  93. for word in participles:
  94. conditions.append({
  95. "multi_match": {
  96. "query": word,
  97. "type": "phrase",
  98. "fields": ["title"]
  99. }
  100. })
  101. conditions.append({"range": {"publishtime": {"from": stime, "to": etime}}})
  102. query = {
  103. "query": {
  104. "bool": {
  105. "must": conditions,
  106. "minimum_should_match": 1
  107. }
  108. }
  109. }
  110. result = client.search(index=es_conf['db'], body=query, request_timeout=100)
  111. count = len(result['hits']['hits'])
  112. return count
  113. # ---------------------------------- redis ----------------------------------
  114. def redis_client(cfg=None, host=None, port=None, db=None, password=None):
  115. _cfg = (cfg or redis_conf)
  116. host = (host or _cfg['host'])
  117. port = (port or _cfg['port'])
  118. password = (password or _cfg['pwd'])
  119. db = (db or _cfg['db'])
  120. pool = redis.ConnectionPool(
  121. host=host,
  122. port=port,
  123. password=password,
  124. db=db
  125. )
  126. return redis.Redis(connection_pool=pool, decode_responses=True)