dongzhaorui@topnet.net.cn 3 жил өмнө
parent
commit
dd15fb7a97
1 өөрчлөгдсөн 95 нэмэгдсэн , 33 устгасан
  1. 95 33
      ybw/utils/databases.py

+ 95 - 33
ybw/utils/databases.py

@@ -1,47 +1,109 @@
-from typing import Optional
-
+import bson
 import pymongo
 import redis
-from pymongo.collection import Collection
-from pymongo.database import Database
+import requests
+from elasticsearch import Elasticsearch
+
+from config.load import mongo_conf, redis_conf, es_conf, analyze_url
+
+
+# ---------------------------------- mongo ----------------------------------
+def mongo_client(cfg=None):
+    if cfg is None:
+        cfg = mongo_conf
+    return pymongo.MongoClient(host=cfg['host'], port=cfg['port'])
+
+
+def mongo_database(db: str):
+    client = mongo_client()
+    return client[db]
+
+
+def mongo_table(db: str, coll: str):
+    client = mongo_client()
+    return client[db][coll]
+
+
+def int2long(param: int):
+    """int 转换成 long """
+    return bson.int64.Int64(param)
 
-from config.load import mongo_conf, redis_conf
 
-__all__ = ['MongoDBS', 'RedisDBS']
+def object_id(_id: str):
+    return bson.objectid.ObjectId(_id)
 
 
-class MongoDBS:
-    """ Mongo """
+# ---------------------------------- es ----------------------------------
+def es_client(cfg=None):
+    if cfg is None:
+        cfg = es_conf
+    return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}])
 
-    def __init__(self, db: str, collection: str, cfg: dict = mongo_conf):
-        self.client = pymongo.MongoClient(host=cfg['host'], port=cfg['port'])
-        self.db: Database = self.client[db]
-        self.coll: Collection = self.db[collection]
 
-    def __enter__(self):
-        return self.coll
+def es_participles_service(text: str):
+    """
+    获取文本的分词列表
 
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        # 上下文管理器,实例调用完毕后,关闭客户端连接
-        self.client.close()
+    :param text: 需要分词的文本
+    :return: 分词列表
+    """
+    result = []
+    params = {"text": text, "analyzer": "ik_smart"}
+    res = requests.get(analyze_url, params=params, timeout=60)
+    if res.status_code == 200:
+        tokens = res.json().get('tokens', [])
+        for x in tokens:
+            if x["token"].encode('utf-8').isalpha():
+                continue
+            result.append(x["token"])
+    return result
 
-    def __del__(self):
-        # 实例调用完毕后,关闭客户端连接
-        self.client.close()
 
+def es_query(title: str, publish_time: int):
+    """
+    查询es
 
-class RedisDBS:
-    """ redis """
+    :param title: 标题
+    :param publish_time: 发布时间
+    :return:
+    """
+    client = es_client()
+    stime = publish_time - 432000  # 往前推5天
+    etime = publish_time + 432000
+    conditions = []
+    participles = es_participles_service(title)
+    for word in participles:
+        conditions.append({
+            "multi_match": {
+                "query": word,
+                "type": "phrase",
+                "fields": ["title"]
+            }
+        })
+    conditions.append({
+        "range": {"publishtime": {"from": stime, "to": etime}}
+    })
+    query = {
+        "query": {
+            "bool": {
+                "must": conditions,
+                "minimum_should_match": 1
+            }
+        }
+    }
+    result = client.search(index='bidding', body=query, request_timeout=100)
+    count = len(result['hits']['hits'])
+    return count
 
-    def __init__(self, cfg: Optional[dict] = redis_conf):
-        pool = redis.ConnectionPool(
-            host=cfg['host'],
-            port=cfg['port'],
-            password=cfg['pwd'],
-            db=cfg['db']
-        )
-        self.__r = redis.Redis(connection_pool=pool, decode_responses=True)
 
-    @property
-    def redis(self):
-        return self.__r
+# ---------------------------------- redis ----------------------------------
+def redis_client(cfg=None):
+    if cfg is None:
+        cfg = redis_conf
+    pool = redis.ConnectionPool(
+        host=cfg['host'],
+        port=cfg['port'],
+        password=cfg['pwd'],
+        db=cfg['db']
+    )
+    return redis.Redis(connection_pool=pool, decode_responses=True)