databases.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import bson
  2. import pymongo
  3. import redis
  4. import requests
  5. from elasticsearch import Elasticsearch
  6. from pymongo.collection import Collection
  7. from pymongo.uri_parser import SCHEME
  8. from config.load import mongo_conf, redis_conf, es_conf, analyze_url
  9. # ---------------------------------- mongo ----------------------------------
  10. MONGO_URI_CLIENTS = {} # a dictionary hold all client with uri as key
  11. def mongo_client(host=None, port=None, fork=False, **kwargs):
  12. if host is None:
  13. host = mongo_conf["host"]
  14. if port is None:
  15. port = mongo_conf["port"]
  16. if host is not None and host.startswith(SCHEME):
  17. uri = host
  18. else:
  19. uri = f'mongodb://{host}:{port}'
  20. if fork:
  21. return pymongo.MongoClient(uri, **kwargs)
  22. global MONGO_URI_CLIENTS
  23. matched_client = MONGO_URI_CLIENTS.get(uri)
  24. if matched_client is None:
  25. new_client = pymongo.MongoClient(uri, **kwargs)
  26. if new_client is not None:
  27. MONGO_URI_CLIENTS[uri] = new_client
  28. return new_client
  29. return matched_client
  30. def mongo_database(name: str, **kw):
  31. client = mongo_client(**kw)
  32. return client.get_database(name)
  33. def mongo_table(db: str, name: str, **kw):
  34. database = mongo_database(db, **kw)
  35. return database.get_collection(name)
  36. def int2long(param: int):
  37. """int 转换成 long """
  38. return bson.int64.Int64(param)
  39. def object_id(_id: str):
  40. return bson.objectid.ObjectId(_id)
  41. def update_one(collection: Collection, item):
  42. collection.update_one({'_id': item['_id']}, {'$set': item})
  43. def update_one_by_domain(collection: Collection, item):
  44. lst = []
  45. res = collection.find_one({'domain': item['domain']})
  46. if res is not None:
  47. item['update_at'] = item.pop('create_at')
  48. if item['origin'] not in res['origin']:
  49. lst.append(item['origin']) # 添加挖掘新成员
  50. lst.extend(res['origin']) # 合并已收录旧成员
  51. else:
  52. lst = res['origin']
  53. else:
  54. lst.append(item['origin'])
  55. item['update_at'] = item['create_at']
  56. item.update({'origin': lst})
  57. collection.update_one(
  58. {'domain': item['domain']},
  59. {'$set': item},
  60. upsert=True
  61. )
  62. def insert_one(collection: Collection, item):
  63. collection.insert_one(item)
  64. def insert_many(collection: Collection, items):
  65. for item in items:
  66. insert_one(collection, item)
  67. # ---------------------------------- es ----------------------------------
  68. def es_client(cfg=None):
  69. if cfg is None:
  70. cfg = es_conf
  71. return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}])
  72. def es_participles_service(text: str):
  73. """
  74. 获取文本的分词列表
  75. :param text: 需要分词的文本
  76. :return: 分词列表
  77. """
  78. result = []
  79. params = {"text": text, "analyzer": "ik_smart"}
  80. res = requests.get(analyze_url, params=params, timeout=60)
  81. if res.status_code == 200:
  82. tokens = res.json().get('tokens', [])
  83. for x in tokens:
  84. if x["token"].encode('utf-8').isalpha():
  85. continue
  86. result.append(x["token"])
  87. return result
  88. def es_query(title: str, publish_time: int):
  89. """
  90. 查询es
  91. :param title: 标题
  92. :param publish_time: 发布时间
  93. :return:
  94. """
  95. client = es_client()
  96. stime = publish_time - 432000 # 往前推5天
  97. etime = publish_time + 432000
  98. conditions = []
  99. participles = es_participles_service(title)
  100. for word in participles:
  101. conditions.append({
  102. "multi_match": {
  103. "query": word,
  104. "type": "phrase",
  105. "fields": ["title"]
  106. }
  107. })
  108. conditions.append({"range": {"publishtime": {"from": stime, "to": etime}}})
  109. query = {
  110. "query": {
  111. "bool": {
  112. "must": conditions,
  113. "minimum_should_match": 1
  114. }
  115. }
  116. }
  117. result = client.search(index=es_conf['db'], body=query, request_timeout=100)
  118. count = len(result['hits']['hits'])
  119. return count
  120. # ---------------------------------- redis ----------------------------------
  121. def redis_client(cfg=None, host=None, port=None, db=None, password=None):
  122. _cfg = (cfg or redis_conf)
  123. host = (host or _cfg['host'])
  124. port = (port or _cfg['port'])
  125. password = (password or _cfg['pwd'])
  126. db = (db or _cfg['db'])
  127. return redis.StrictRedis(host=host,
  128. port=port,
  129. password=password,
  130. db=db, decode_responses=True)