Prechádzať zdrojové kódy

update:添加 多台单机 redis 连接方式

dongzhaorui 2 rokov pred
rodič
commit
dfc5c6c6d8

+ 29 - 0
FworkSpider/feapder/dedup/README.md

@@ -124,4 +124,33 @@ def test_filter():
     ss = dedup.filter_exist_data(datas)
     print(ss)
     assert datas == ["ccc"]
+```
+
+```python
+# redis 多实例去重
+from feapder.dedup import Dedup
+
+def test_filter():
+    redis_conf = dict(
+        pylist_=dict(
+            redisdb_ip_port="192.168.3.71:8371",
+            redisdb_user_pass="top@123",
+            redisdb_db=0
+        ),
+        list_=dict(
+            redisdb_ip_port="192.168.3.165:8165",
+            redisdb_user_pass="",
+            redisdb_db=0
+        )
+    )
+    
+    dedup = Dedup(filter_type=6, to_md5=False, redis_conf=redis_conf, expire_time=60)
+    datas = ["xxx", "bbb"]
+    dedup.add(datas)
+    
+    # 过滤掉已存在数据 "xxx", "bbb"
+    datas = ["xxx", "bbb", "ccc"]
+    dedup.filter_exist_data(datas)
+    print(datas)
+    assert datas == ["ccc"]
 ```

+ 8 - 1
FworkSpider/feapder/dedup/__init__.py

@@ -15,7 +15,7 @@ from feapder.utils.tools import get_md5
 from .bloomfilter import BloomFilter, ScalableBloomFilter
 from .expirefilter import ExpireFilter
 from .litefilter import LiteFilter
-from .redisfilter import RedisFilter
+from .redisfilter import RedisFilter, MRedisFilter
 
 
 class Dedup:
@@ -24,6 +24,7 @@ class Dedup:
     ExpireFilter = 3
     LiteFilter = 4
     RedisFilter = 5
+    MRedisFilter = 6
 
     def __init__(self, filter_type: int = BloomFilter, to_md5: bool = True, **kwargs):
         if filter_type == Dedup.ExpireFilter:
@@ -46,9 +47,15 @@ class Dedup:
         elif filter_type == Dedup.RedisFilter:
             self.dedup = RedisFilter(
                 ip_ports=kwargs.get("ip_ports"),
+                user_pass=kwargs.get("user_pass", ""),
                 redis_url=kwargs.get("redis_url"),
                 expire_time=kwargs.get("expire_time")
             )
+        elif filter_type == Dedup.MRedisFilter:
+            self.dedup = MRedisFilter(
+                redis_conf=kwargs.get("redis_conf"),
+                expire_time=kwargs.get("expire_time")
+            )
         else:
             initial_capacity = kwargs.get("initial_capacity", 100000000)
             error_rate = kwargs.get("error_rate", 0.00001)

+ 66 - 6
FworkSpider/feapder/dedup/redisfilter.py

@@ -2,7 +2,7 @@
 """
 Created on 2023-03-01
 ---------
-@summary: redis集群/单机过滤器
+@summary: 集群/单机/多台单机 redis 过滤器
 ---------
 @author: dzr
 @email: dongzhaorui@topnet.net.cn
@@ -16,17 +16,15 @@ import feapder.utils.tools as tools
 class RedisFilter(BaseFilter):
     redis_db = None
 
-    def __init__(self, ip_ports=None, redis_url=None, expire_time=None):
-        if isinstance(ip_ports, list) and len(ip_ports) > 1:
+    def __init__(self, ip_ports=None, user_pass=None, redis_url=None, expire_time=None):
+        if ip_ports:
             self.__class__.redis_db = RedisDB(
                 ip_ports=ip_ports,
-                user_pass='',
+                user_pass=user_pass,
                 decode_responses=True,
             )  # 集群
         elif redis_url:
             self.__class__.redis_db = RedisDB(redis_url=redis_url)  # 单机
-        else:
-            self.__class__.redis_db = RedisDB(ip_ports=ip_ports)  # 单机
 
         self._ex = expire_time or 86400 * 365 * 2  # 2年 = 86400 * 365 * 2
         self._prefix1 = 'list_'
@@ -99,3 +97,65 @@ class RedisFilter(BaseFilter):
                 temp_set.add(key)
 
         return is_exist if is_list else is_exist[0]
+
+
+class MRedisFilter(RedisFilter):
+    redis_dbs = {}
+
+    def __init__(self, redis_conf=None, **kwargs):
+        super(MRedisFilter, self).__init__(**kwargs)
+        if not redis_conf:
+            self.__class__.redis_dbs[self._prefix2] = RedisDB()
+        else:
+            if not isinstance(redis_conf, dict):
+                raise ValueError("redis_conf 必须是一个 dict")
+
+            for prefix, conf in redis_conf.items():
+                self.__class__.redis_dbs[prefix] = RedisDB(
+                    ip_ports=conf['redisdb_ip_port'],
+                    user_pass=conf['redisdb_user_pass'],
+                    db=conf['redisdb_db']
+                )
+
+    def __repr__(self):
+        return "<MRedisFilter: {}>".format(self.redis_dbs)
+
+    def exists(self, key):
+        """lua增量检索/python增量检索"""
+        if '&&' in key:
+            md5, sha256 = key.split("&&")
+            mixture = tools.get_sha256(md5)
+        else:
+            mixture = sha256 = key
+
+        for prefix, redis_db in self.redis_dbs.items():
+            if any([
+                redis_db.exists(prefix + sha256) > 0,
+                redis_db.exists(prefix + mixture) > 0
+            ]):
+                return True
+        return False
+
+    def add(self, keys, *args, **kwargs):
+        """
+        添加数据
+        @param keys: 检查关键词在 redis 中是否存在,支持列表批量
+        @return: list / 单个值(如果数据已存在 返回 False 否则返回 True, 可以理解为是否添加成功)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+
+        redis_db = self.redis_dbs[self._prefix2]
+
+        is_added = []
+        for key in keys:
+            if not self.exists(key):
+                if '&&' in key:
+                    md5, sha256 = key.split("&&")
+                else:
+                    sha256 = key
+                is_added.append(redis_db.set(self._prefix2 + sha256, 1, ex=self._ex))
+            else:
+                is_added.append(False)
+
+        return is_added if is_list else is_added[0]