Ver código fonte

添加新过滤器

dongzhaorui 2 anos atrás
pai
commit
f70a7ef53b

+ 21 - 22
zgztb_cookie/FworkSpider/feapder/dedup/__init__.py

@@ -9,36 +9,24 @@ Created on 2018-12-13 21:08
 """
 
 import copy
-from typing import Any, List, Union, Optional, Tuple, Callable
+from typing import Any, List, Union, Tuple, Callable, Optional
 
 from feapder.utils.tools import get_md5
 from .bloomfilter import BloomFilter, ScalableBloomFilter
 from .expirefilter import ExpireFilter
+from .litefilter import LiteFilter
+from .redisfilter import RedisFilter, MRedisFilter
 
 
 class Dedup:
     BloomFilter = 1
     MemoryFilter = 2
     ExpireFilter = 3
+    LiteFilter = 4
+    RedisFilter = 5
+    MRedisFilter = 6
 
     def __init__(self, filter_type: int = BloomFilter, to_md5: bool = True, **kwargs):
-        """
-        去重过滤器 集成BloomFilter、MemoryFilter、ExpireFilter
-        Args:
-            filter_type: 过滤器类型 BloomFilter
-            name: 过滤器名称 该名称会默认以dedup作为前缀 dedup:expire_set:[name]/dedup:bloomfilter:[name]。 默认ExpireFilter name=过期时间; BloomFilter name=dedup:bloomfilter:bloomfilter
-            absolute_name: 过滤器绝对名称 不会加dedup前缀,当此值不为空时name参数无效
-            expire_time: ExpireFilter的过期时间 单位为秒,其他两种过滤器不用指定
-            error_rate: BloomFilter/MemoryFilter的误判率 默认为0.00001
-            to_md5: 去重前是否将数据转为MD5,默认是
-            redis_url: redis://[[username]:[password]]@localhost:6379/0
-                       BloomFilter 与 ExpireFilter 使用
-                       默认会读取setting中的redis配置,若无setting,则需要专递redis_url
-            initial_capacity: 单个布隆过滤器去重容量 默认100000000,当布隆过滤器容量满时会扩展下一个布隆过滤器
-            error_rate:布隆过滤器的误判率 默认0.00001
-            **kwargs:
-        """
-
         if filter_type == Dedup.ExpireFilter:
             try:
                 expire_time = kwargs["expire_time"]
@@ -56,13 +44,22 @@ class Dedup:
                 expire_time_record_key=expire_time_record_key,
                 redis_url=kwargs.get("redis_url"),
             )
-
+        elif filter_type == Dedup.RedisFilter:
+            self.dedup = RedisFilter(
+                ip_ports=kwargs.get("ip_ports"),
+                user_pass=kwargs.get("user_pass", ""),
+                redis_url=kwargs.get("redis_url"),
+                expire_time=kwargs.get("expire_time")
+            )
+        elif filter_type == Dedup.MRedisFilter:
+            self.dedup = MRedisFilter(
+                redis_conf=kwargs.get("redis_conf"),
+                expire_time=kwargs.get("expire_time")
+            )
         else:
             initial_capacity = kwargs.get("initial_capacity", 100000000)
             error_rate = kwargs.get("error_rate", 0.00001)
-            name = kwargs.get("absolute_name") or "dedup:bloomfilter:" + kwargs.get(
-                "name", "bloomfilter"
-            )
+            name = kwargs.get("absolute_name") or "dedup:bloomfilter:" + kwargs.get("name", "bloomfilter")
             if filter_type == Dedup.BloomFilter:
                 self.dedup = ScalableBloomFilter(
                     name=name,
@@ -78,6 +75,8 @@ class Dedup:
                     error_rate=error_rate,
                     bitarray_type=ScalableBloomFilter.BASE_MEMORY,
                 )
+            elif filter_type == Dedup.LiteFilter:
+                self.dedup = LiteFilter()
             else:
                 raise ValueError(
                     "filter_type 类型错误,仅支持 Dedup.BloomFilter、Dedup.MemoryFilter、Dedup.ExpireFilter"

+ 41 - 0
zgztb_cookie/FworkSpider/feapder/dedup/basefilter.py

@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2022/9/21 11:17 AM
+---------
+@summary:
+---------
+@author: Boris
+@email: boris_liu@foxmail.com
+"""
+import abc
+from typing import List, Union
+
+
+class BaseFilter:
+    @abc.abstractmethod
+    def add(
+        self, keys: Union[List[str], str], *args, **kwargs
+    ) -> Union[List[bool], bool]:
+        """
+
+        Args:
+            keys: list / 单个值
+            *args:
+            **kwargs:
+
+        Returns:
+            list / 单个值 (如果数据已存在 返回 0 否则返回 1, 可以理解为是否添加成功)
+        """
+        pass
+
+    @abc.abstractmethod
+    def get(self, keys: Union[List[str], str]) -> Union[List[bool], bool]:
+        """
+        检查数据是否存在
+        Args:
+            keys: list / 单个值
+
+        Returns:
+            list / 单个值 (如果数据已存在 返回 1 否则返回 0)
+        """
+        pass

+ 14 - 20
zgztb_cookie/FworkSpider/feapder/dedup/bitarray.py

@@ -14,7 +14,7 @@ import threading
 import time
 from struct import unpack, pack
 
-from feapder.db.redisdb import RedisDB
+from feapder.dedup.basefilter import BaseFilter
 from feapder.utils.redis_lock import RedisLock
 from . import bitarray
 
@@ -146,24 +146,18 @@ class BloomFilter(object):
         比较耗时 半小时检查一次
         @return:
         """
-        # if self._is_at_capacity:
-        #     return self._is_at_capacity
-        #
-        # if not self._check_capacity_time or time.time() - self._check_capacity_time > 1800:
-        #     bit_count = self.bitarray.count()
-        #     if bit_count and bit_count / self.num_bits > 0.5:
-        #         self._is_at_capacity = True
-        #
-        #     self._check_capacity_time = time.time()
-        #
-        # return self._is_at_capacity
-
         if self._is_at_capacity:
             return self._is_at_capacity
 
-        bit_count = self.bitarray.count()
-        if bit_count and bit_count / self.num_bits > 0.5:
-            self._is_at_capacity = True
+        if (
+            not self._check_capacity_time
+            or time.time() - self._check_capacity_time > 1800
+        ):
+            bit_count = self.bitarray.count()
+            if bit_count and bit_count / self.num_bits > 0.5:
+                self._is_at_capacity = True
+
+            self._check_capacity_time = time.time()
 
         return self._is_at_capacity
 
@@ -174,8 +168,8 @@ class BloomFilter(object):
         @param keys: list or one key
         @return:
         """
-        if self.is_at_capacity:
-            raise IndexError("BloomFilter is at capacity")
+        # if self.is_at_capacity:
+        #     raise IndexError("BloomFilter is at capacity")
 
         is_list = isinstance(keys, list)
 
@@ -197,7 +191,7 @@ class BloomFilter(object):
         return is_added if is_list else is_added[0]
 
 
-class ScalableBloomFilter(object):
+class ScalableBloomFilter(BaseFilter):
     """
     自动扩展空间的bloomfilter, 当一个filter满一半的时候,创建下一个
     """
@@ -273,7 +267,7 @@ class ScalableBloomFilter(object):
                     if self.name
                     else "ScalableBloomFilter"
                 )
-                with RedisLock(key=key) as lock:
+                with RedisLock(key=key, redis_url=self.redis_url) as lock:
                     if lock.locked:
                         while True:
                             if self.filters[-1].is_at_capacity:

+ 13 - 2
zgztb_cookie/FworkSpider/feapder/dedup/expirefilter.py

@@ -11,9 +11,10 @@ Created on 2018/12/13 9:44 PM
 import time
 
 from feapder.db.redisdb import RedisDB
+from feapder.dedup.basefilter import BaseFilter
 
 
-class ExpireFilter:
+class ExpireFilter(BaseFilter):
     redis_db = None
 
     def __init__(
@@ -55,7 +56,17 @@ class ExpireFilter:
         return is_added
 
     def get(self, keys):
-        return self.redis_db.zexists(self.name, keys)
+        is_exist = self.redis_db.zexists(self.name, keys)
+        if isinstance(keys, list):
+            # 判断数据本身是否重复
+            temp_set = set()
+            for i, key in enumerate(keys):
+                if key in temp_set:
+                    is_exist[i] = 1
+                else:
+                    temp_set.add(key)
+
+        return is_exist
 
     def del_expire_key(self):
         self.redis_db.zremrangebyscore(

+ 70 - 0
zgztb_cookie/FworkSpider/feapder/dedup/litefilter.py

@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2022/9/21 11:28 AM
+---------
+@summary:
+---------
+@author: Boris
+@email: boris_liu@foxmail.com
+"""
+from typing import List, Union, Set
+
+from feapder.dedup.basefilter import BaseFilter
+
+
+class LiteFilter(BaseFilter):
+    def __init__(self):
+        self.datas: Set[str] = set()
+
+    def add(
+        self, keys: Union[List[str], str], *args, **kwargs
+    ) -> Union[List[int], int]:
+        """
+
+        Args:
+            keys: list / 单个值
+            *args:
+            **kwargs:
+
+        Returns:
+            list / 单个值 (如果数据已存在 返回 0 否则返回 1, 可以理解为是否添加成功)
+        """
+        if isinstance(keys, list):
+            is_add = []
+            for key in keys:
+                if key not in self.datas:
+                    self.datas.add(key)
+                    is_add.append(1)
+                else:
+                    is_add.append(0)
+        else:
+            if keys not in self.datas:
+                is_add = 1
+                self.datas.add(keys)
+            else:
+                is_add = 0
+        return is_add
+
+    def get(self, keys: Union[List[str], str]) -> Union[List[int], int]:
+        """
+        检查数据是否存在
+        Args:
+            keys: list / 单个值
+
+        Returns:
+            list / 单个值 (如果数据已存在 返回 1 否则返回 0)
+        """
+        if isinstance(keys, list):
+            temp_set = set()
+            is_exist = []
+            for key in keys:
+                # 数据本身重复或者数据在去重库里
+                if key in temp_set or key in self.datas:
+                    is_exist.append(1)
+                else:
+                    is_exist.append(0)
+                    temp_set.add(key)
+
+            return is_exist
+        else:
+            return int(keys in self.datas)

+ 131 - 0
zgztb_cookie/FworkSpider/feapder/dedup/redisfilter.py

@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-03-01
+---------
+@summary: redis 过滤器
+---------
+@author: dzr
+"""
+from feapder.db.redisdb import RedisDB
+from feapder.dedup.basefilter import BaseFilter
+
+
+class RedisFilter(BaseFilter):
+    redis_db = None
+
+    def __init__(self, ip_ports=None, user_pass=None, redis_url=None, expire_time=None):
+        if ip_ports:
+            self.__class__.redis_db = RedisDB(
+                ip_ports=ip_ports,
+                user_pass=user_pass,
+                decode_responses=True,
+            )  # 集群
+        elif redis_url:
+            self.__class__.redis_db = RedisDB(redis_url=redis_url)  # 单机
+        else:
+            self.__class__.redis_db = RedisDB()
+
+        self._ex = expire_time or 86400 * 365 * 2  # 2年 = 86400 * 365 * 2
+        self._prefix1 = 'list_'
+        self._prefix2 = 'pylist_'
+
+    def __repr__(self):
+        return "<RedisFilter: {}>".format(self.redis_db)
+
+    def exists(self, key):
+        """全量python检索"""
+        if self.redis_db.exists(self._prefix2 + key) > 0:
+            return True
+        return False
+
+    def add(self, keys, *args, **kwargs):
+        """
+        添加数据
+        @param keys: 检查关键词在 redis 中是否存在,支持列表批量
+        @return: list / 单个值(如果数据已存在 返回 False 否则返回 True, 可以理解为是否添加成功)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+
+        is_added = []
+        for key in keys:
+            if not self.exists(key):
+                is_added.append(
+                    self.redis_db.set(self._prefix2 + key, 1, ex=self._ex)
+                )
+            else:
+                is_added.append(False)
+
+        return is_added if is_list else is_added[0]
+
+    def get(self, keys):
+        """
+        检查数据是否存在
+        @param keys: list / 单个值
+        @return: list / 单个值 (存在返回True 不存在返回False)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+
+        is_exist = []
+        for key in keys:
+            is_exist.append(self.exists(key))
+
+        # 判断数据本身是否重复
+        temp_set = set()
+        for i, key in enumerate(keys):
+            if key in temp_set:
+                is_exist[i] = True
+            else:
+                temp_set.add(key)
+
+        return is_exist if is_list else is_exist[0]
+
+
+class MRedisFilter(RedisFilter):
+    redis_dbs = {}
+
+    def __init__(self, redis_conf=None, **kwargs):
+        super(MRedisFilter, self).__init__(**kwargs)
+        if not redis_conf:
+            self.__class__.redis_dbs[self._prefix2] = RedisDB()
+        else:
+            if not isinstance(redis_conf, dict):
+                raise ValueError("redis_conf 必须是一个 dict")
+
+            for prefix, conf in redis_conf.items():
+                self.__class__.redis_dbs[prefix] = RedisDB(
+                    ip_ports=conf['redisdb_ip_port'],
+                    user_pass=conf['redisdb_user_pass'],
+                    db=conf['redisdb_db']
+                )
+
+    def __repr__(self):
+        return "<MRedisFilter: {}>".format(self.redis_dbs)
+
+    def exists(self, key):
+        """lua增量检索/python增量检索"""
+        for prefix, redis_db in self.redis_dbs.items():
+            if redis_db.exists(prefix + key) > 0:
+                return True
+        return False
+
+    def add(self, keys, *args, **kwargs):
+        """
+        添加数据
+        @param keys: 检查关键词在 redis 中是否存在,支持列表批量
+        @return: list / 单个值(如果数据已存在 返回 False 否则返回 True, 可以理解为是否添加成功)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+
+        redis_db = self.redis_dbs[self._prefix2]
+
+        is_added = []
+        for key in keys:
+            if not self.exists(key):
+                is_added.append(redis_db.set(self._prefix2 + key, 1, ex=self._ex))
+            else:
+                is_added.append(False)
+
+        return is_added if is_list else is_added[0]