瀏覽代碼

update:添加redis单机去重过滤器

dongzhaorui 2 年之前
父節點
當前提交
c432c4c4d1

+ 19 - 1
FworkSpider/feapder/dedup/README.md

@@ -95,7 +95,7 @@ def test_filter():
 from feapder.dedup import Dedup
 from feapder.dedup import Dedup
 
 
 def test_filter():
 def test_filter():
-    dedup = Dedup(Dedup.RedisClusterFilter, redis_url=["192.168.3.207:2179", "192.168.3.166:2379"], expire_time=20)
+    dedup = Dedup(Dedup.RedisClusterFilter, redis_url=["192.168.3.207:2179", "192.168.3.166:2379"], expire_time=100)
 
 
     # 制造已存在数据
     # 制造已存在数据
     datas = ["xxx", "bbb"]
     datas = ["xxx", "bbb"]
@@ -107,3 +107,21 @@ def test_filter():
     print(ss)
     print(ss)
     assert datas == ["ccc"]
     assert datas == ["ccc"]
 ```
 ```
+
+```python
+# redis 去重
+from feapder.dedup import Dedup
+
+def test_filter():
+    dedup = Dedup(Dedup.RedisFilter, expire_time=100)
+
+    # 制造已存在数据
+    datas = ["xxx", "bbb"]
+    dedup.add(datas)
+
+    # 过滤掉已存在数据 "xxx", "bbb"
+    datas = ["xxx", "bbb", "ccc"]
+    ss = dedup.filter_exist_data(datas)
+    print(ss)
+    assert datas == ["ccc"]
+```

+ 7 - 0
FworkSpider/feapder/dedup/__init__.py

@@ -16,6 +16,7 @@ from .bloomfilter import BloomFilter, ScalableBloomFilter
 from .expirefilter import ExpireFilter
 from .expirefilter import ExpireFilter
 from .litefilter import LiteFilter
 from .litefilter import LiteFilter
 from .redisclusterfilter import RedisClusterFilter
 from .redisclusterfilter import RedisClusterFilter
+from .redisfilter import RedisFilter
 
 
 
 
 class Dedup:
 class Dedup:
@@ -24,6 +25,7 @@ class Dedup:
     ExpireFilter = 3
     ExpireFilter = 3
     LiteFilter = 4
     LiteFilter = 4
     RedisClusterFilter = 5
     RedisClusterFilter = 5
+    RedisFilter = 6
 
 
     def __init__(self, filter_type: int = BloomFilter, to_md5: bool = True, **kwargs):
     def __init__(self, filter_type: int = BloomFilter, to_md5: bool = True, **kwargs):
         if filter_type == Dedup.ExpireFilter:
         if filter_type == Dedup.ExpireFilter:
@@ -48,6 +50,11 @@ class Dedup:
                 redis_url=kwargs.get("redis_url"),
                 redis_url=kwargs.get("redis_url"),
                 expire_time=kwargs.get("expire_time")
                 expire_time=kwargs.get("expire_time")
             )
             )
+        elif filter_type == Dedup.RedisFilter:
+            self.dedup = RedisFilter(
+                redis_url=kwargs.get("redis_url"),
+                expire_time=kwargs.get("expire_time")
+            )
         else:
         else:
             initial_capacity = kwargs.get("initial_capacity", 100000000)
             initial_capacity = kwargs.get("initial_capacity", 100000000)
             error_rate = kwargs.get("error_rate", 0.00001)
             error_rate = kwargs.get("error_rate", 0.00001)

+ 101 - 0
FworkSpider/feapder/dedup/redisfilter.py

@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-03-01
+---------
+@summary: redis单机过滤
+---------
+@author: dzr
+@email: dongzhaorui@topnet.net.cn
+"""
+import copy
+
+from feapder.db.redisdb import RedisDB
+from feapder.dedup.basefilter import BaseFilter
+from feapder.utils.tools import get_sha256
+
+
+class RedisFilter(BaseFilter):
+    redis_db = None
+
+    def __init__(self, redis_url=None, to_sha256: bool = True, expire_time=None):
+        self._url = redis_url
+        if not self.__class__.redis_db:
+            self.__class__.redis_db = RedisDB(url=redis_url)
+
+        self._ex = expire_time or 86400 * 365 * 2  # 2年 = 86400 * 365 * 2
+        self._prefix1 = 'list_'
+        self._prefix2 = 'pylist_'
+
+        self._to_sha256 = to_sha256
+
+    def __repr__(self):
+        return "<RedisDB: {}>".format(self.redis_db)
+
+    def _deal_datas(self, datas):
+        if self._to_sha256:
+            if isinstance(datas, list):
+                keys = [get_sha256(data) for data in datas]
+            else:
+                keys = get_sha256(datas)
+        else:
+            keys = copy.deepcopy(datas)
+
+        return keys
+
+    def _exists(self, key):
+        return self.redis_db.exists(key)
+
+    def exists(self, key):
+        """全量检索/lua增量检索/python增量检索"""
+        if (
+                self._exists(key) > 0
+                or self._exists(self._prefix1 + key) > 0
+                or self._exists(self._prefix2 + key) > 0
+        ):
+            return True
+        return False
+
+    def add(self, keys, *args, **kwargs):
+        """
+        添加数据
+        @param keys: 检查关键词在 redis 中是否存在,支持列表批量
+        @return: list / 单个值(如果数据已存在 返回 False 否则返回 True, 可以理解为是否添加成功)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+        encrypt_keys = self._deal_datas(keys)
+
+        is_added = []
+        for key in encrypt_keys:
+            if not self.exists(key):
+                is_added.append(
+                    self.redis_db.set(self._prefix2 + key, 1, ex=self._ex)
+                )
+            else:
+                is_added.append(False)
+
+        return is_added if is_list else is_added[0]
+
+    def get(self, keys):
+        """
+        检查数据是否存在
+        @param keys: list / 单个值
+        @return: list / 单个值 (存在返回True 不存在返回False)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+        encrypt_keys = self._deal_datas(keys)
+
+        is_exist = []
+        for key in encrypt_keys:
+            is_exist.append(self.exists(key))
+
+        # 判断数据本身是否重复
+        temp_set = set()
+        for i, key in enumerate(encrypt_keys):
+            if key in temp_set:
+                is_exist[i] = True
+            else:
+                temp_set.add(key)
+
+        return is_exist if is_list else is_exist[0]

+ 1 - 1
FworkSpider/setting.py

@@ -68,7 +68,7 @@ PROXY_ENABLE = True
 # item去重
 # item去重
 ITEM_FILTER_ENABLE = True  # item 去重
 ITEM_FILTER_ENABLE = True  # item 去重
 ITEM_FILTER_SETTING = dict(
 ITEM_FILTER_SETTING = dict(
-    filter_type=5,  # 永久去重(BloomFilter)= 1; 内存去重(MemoryFilter)= 2; 临时去重(ExpireFilter)= 3; 轻量去重(LiteFilter)= 4; 集群去重(RedisClusterFilter)= 5
+    filter_type=5,  # 永久去重(BloomFilter)= 1; 内存去重(MemoryFilter)= 2; 临时去重(ExpireFilter)= 3; 轻量去重(LiteFilter)= 4; redis集群去重(RedisClusterFilter)= 5;redis去重 = 6;
     expire_time=63072000,  # 过期时间2年
     expire_time=63072000,  # 过期时间2年
     redis_url=["172.17.4.239:2479", "172.17.4.240:2579", "172.17.4.84:2379"],  # 集群节点
     redis_url=["172.17.4.239:2479", "172.17.4.240:2579", "172.17.4.84:2379"],  # 集群节点
 )
 )