dzr 8 месяцев назад
Родитель
Сommit
3b96aac8c8
6 измененных файлов с 981 добавлено и 55 удалено
  1. 5 4
      base_server.py
  2. 0 48
      common/databases.py
  3. 969 0
      common/redisdb.py
  4. 2 0
      conf/dev.yaml
  5. 2 0
      conf/test.yaml
  6. 3 3
      services/proxy.py

+ 5 - 4
base_server.py

@@ -7,7 +7,8 @@ Created on 2023-05-09
 @author: Dzr
 """
 import common.utils as tools
-from common.databases import mongo_table, redis_client
+from common.databases import mongo_table
+from common.redisdb import RedisDB
 
 
 class BaseServer:
@@ -22,7 +23,7 @@ class BaseServer:
 
         self.task_fingerprint = f"{label}_task_fingerprint"
         self.request_tasks = f"{label}_task_list"
-        self.redis_db = redis_client(cfg=kwargs.pop('redis_cfg', None))
+        self.redis_db = RedisDB(kwargs.pop('redis_cfg', None))
 
         self._unique_key = ('_id',)
 
@@ -61,14 +62,14 @@ class BaseServer:
         self.redis_db.hdel(self.task_fingerprint, fingerprint)
 
     def rpush(self, task: str):
-        self.redis_db.rpush(self.request_tasks, task)
+        self.redis_db.get_redis_obj().rpush(self.request_tasks, task)
 
     def lpop(self):
         return self.redis_db.lpop(self.request_tasks)
 
     @property
     def task_total(self):
-        return self.redis_db.llen(self.request_tasks)
+        return self.redis_db.get_redis_obj().llen(self.request_tasks)
 
     def upload_data_to_mongodb(self, data, bulk_size=100, **kwargs):
         db = kwargs.pop('db', None) or self.mgo_db_name

+ 0 - 48
common/databases.py

@@ -1,11 +1,6 @@
 import bson
 import pymongo
-import redis
-from redis._compat import unicode, long, basestring
-from redis.connection import Encoder as RedisEncoder
-from redis.exceptions import DataError
 
-# import config.load as settings
 import setting as settings
 
 # ---------------------------------- mongo ----------------------------------
@@ -48,46 +43,3 @@ def int2long(param: int):
 
 def object_id(_id: str):
     return bson.objectid.ObjectId(_id)
-
-
-# ---------------------------------- redis ----------------------------------
-def redis_client(cfg=None):
-
-    class Encoder(RedisEncoder):
-
-        def encode(self, value):
-            "Return a bytestring or bytes-like representation of the value"
-            if isinstance(value, (bytes, memoryview)):
-                return value
-            # elif isinstance(value, bool):
-            #     # special case bool since it is a subclass of int
-            #     raise DataError(
-            #         "Invalid input of type: 'bool'. Convert to a "
-            #         "bytes, string, int or float first."
-            #     )
-            elif isinstance(value, float):
-                value = repr(value).encode()
-            elif isinstance(value, (int, long)):
-                # python 2 repr() on longs is '123L', so use str() instead
-                value = str(value).encode()
-            elif isinstance(value, (list, dict, tuple)):
-                value = unicode(value)
-            elif not isinstance(value, basestring):
-                # a value we don't know how to deal with. throw an error
-                typename = type(value).__name__
-                raise DataError(
-                    "Invalid input of type: '%s'. Convert to a "
-                    "bytes, string, int or float first." % typename
-                )
-            if isinstance(value, unicode):
-                value = value.encode(self.encoding, self.encoding_errors)
-            return value
-
-    redis.connection.Encoder = Encoder
-
-    if cfg is None:
-        cfg = settings.redis_conf
-
-    return redis.StrictRedis(host=cfg['host'], port=cfg['port'],
-                             password=cfg['pwd'], db=cfg['db'],
-                             decode_responses=True)

+ 969 - 0
common/redisdb.py

@@ -0,0 +1,969 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2016-11-16 16:25
+---------
+@summary: 操作redis数据库
+---------
+@author: Boris
+"""
+
+import os
+import time
+from typing import Union, List
+
+import redis
+from redis.connection import Encoder as _Encoder
+from redis.exceptions import ConnectionError, TimeoutError
+from redis.exceptions import DataError
+from redis.sentinel import Sentinel
+
+import setting
+from common.log import logger as log
+
+
+class Encoder(_Encoder):
+    def encode(self, value):
+        "Return a bytestring or bytes-like representation of the value"
+        if isinstance(value, (bytes, memoryview)):
+            return value
+        # elif isinstance(value, bool):
+        #     # special case bool since it is a subclass of int
+        #     raise DataError(
+        #         "Invalid input of type: 'bool'. Convert to a "
+        #         "bytes, string, int or float first."
+        #     )
+        elif isinstance(value, float):
+            value = repr(value).encode()
+        elif isinstance(value, int):
+            # python 2 repr() on longs is '123L', so use str() instead
+            value = str(value).encode()
+        elif isinstance(value, (list, dict, tuple)):
+            value = str(value)
+        elif not isinstance(value, str):
+            # a value we don't know how to deal with. throw an error
+            typename = type(value).__name__
+            raise DataError(
+                "Invalid input of type: '%s'. Convert to a "
+                "bytes, string, int or float first." % typename
+            )
+        if isinstance(value, str):
+            value = value.encode(self.encoding, self.encoding_errors)
+        return value
+
+
+redis.connection.Encoder = Encoder
+
+
+class RedisDB:
+    def __init__(
+        self,
+        ip_ports=None,
+        db=None,
+        user_pass=None,
+        url=None,
+        decode_responses=True,
+        service_name=None,
+        max_connections=1000,
+        **kwargs,
+    ):
+        """
+        redis的封装
+        Args:
+            ip_ports: ip:port 多个可写为列表或者逗号隔开 如 ip1:port1,ip2:port2 或 ["ip1:port1", "ip2:port2"]
+            db:
+            user_pass:
+            url:
+            decode_responses:
+            service_name: 适用于redis哨兵模式
+            max_connections: 同一个redis对象使用的并发数(连接池的最大连接数),超过这个数量会抛出redis.ConnectionError
+        """
+
+        # 可能会改setting中的值,所以此处不能直接赋值为默认值,需要后加载赋值
+        if ip_ports is None:
+            ip = setting.redis_conf['host']
+            port = setting.redis_conf['port']
+            ip_ports = f'{ip}:{port}'
+        if db is None:
+            db = setting.redis_conf['db']
+        if user_pass is None:
+            user_pass = setting.redis_conf['pwd']
+        if service_name is None:
+            service_name = setting.redis_conf['service_name']
+        if kwargs is None:
+            kwargs = setting.redis_conf['redis_kwargs']
+
+        self._is_redis_cluster = False
+
+        self.__redis = None
+        self._url = url
+        self._ip_ports = ip_ports
+        self._db = db
+        self._user_pass = user_pass
+        self._decode_responses = decode_responses
+        self._service_name = service_name
+        self._max_connections = max_connections
+        self._kwargs = kwargs
+        self.get_connect()
+
+    def __repr__(self):
+        if self._url:
+            return "<Redisdb url:{}>".format(self._url)
+
+        return "<Redisdb ip_ports: {} db:{} user_pass:{}>".format(
+            self._ip_ports, self._db, self._user_pass
+        )
+
+    @property
+    def _redis(self):
+        try:
+            if not self.__redis.ping():
+                raise ConnectionError("unable to connect to redis")
+        except:
+            self._reconnect()
+
+        return self.__redis
+
+    @_redis.setter
+    def _redis(self, val):
+        self.__redis = val
+
+    def get_connect(self):
+        # 获取数据库连接
+        try:
+            if not self._url:
+                if not self._ip_ports:
+                    raise ConnectionError("未设置 redis 连接信息")
+
+                ip_ports = (
+                    self._ip_ports
+                    if isinstance(self._ip_ports, list)
+                    else self._ip_ports.split(",")
+                )
+                if len(ip_ports) > 1:
+                    startup_nodes = []
+                    for ip_port in ip_ports:
+                        ip, port = ip_port.split(":")
+                        startup_nodes.append({"host": ip, "port": port})
+
+                    if self._service_name:
+                        # log.debug("使用redis哨兵模式")
+                        hosts = [(node["host"], node["port"]) for node in startup_nodes]
+                        sentinel = Sentinel(hosts, socket_timeout=3, **self._kwargs)
+                        self._redis = sentinel.master_for(
+                            self._service_name,
+                            password=self._user_pass,
+                            db=self._db,
+                            redis_class=redis.StrictRedis,
+                            decode_responses=self._decode_responses,
+                            max_connections=self._max_connections,
+                            **self._kwargs,
+                        )
+
+                    else:
+                        try:
+                            from rediscluster import RedisCluster
+                        except ModuleNotFoundError as e:
+                            log.error('请安装 pip install "feapder[all]"')
+                            os._exit(0)
+
+                        # log.debug("使用redis集群模式")
+                        self._redis = RedisCluster(
+                            startup_nodes=startup_nodes,
+                            decode_responses=self._decode_responses,
+                            password=self._user_pass,
+                            max_connections=self._max_connections,
+                            **self._kwargs,
+                        )
+
+                    self._is_redis_cluster = True
+                else:
+                    ip, port = ip_ports[0].split(":")
+                    self._redis = redis.StrictRedis(
+                        host=ip,
+                        port=port,
+                        db=self._db,
+                        password=self._user_pass,
+                        decode_responses=self._decode_responses,
+                        max_connections=self._max_connections,
+                        **self._kwargs,
+                    )
+                    self._is_redis_cluster = False
+            else:
+                self._redis = redis.StrictRedis.from_url(
+                    self._url, decode_responses=self._decode_responses, **self._kwargs
+                )
+                self._is_redis_cluster = False
+
+        except Exception as e:
+            raise e
+
+        # 不要写成self._redis.ping() 否则循环调用了
+        return self.__redis.ping()
+
+    @classmethod
+    def from_url(cls, url):
+        """
+
+        Args:
+            url: redis://[[username]:[password]]@[host]:[port]/[db]
+
+        Returns:
+
+        """
+        return cls(url=url)
+
+    def sadd(self, table, values):
+        """
+        @summary: 使用无序set集合存储数据, 去重
+        ---------
+        @param table:
+        @param values: 值; 支持list 或 单个值
+        ---------
+        @result: 若库中存在 返回0,否则入库,返回1。 批量添加返回None
+        """
+
+        if isinstance(values, list):
+            pipe = self._redis.pipeline()
+
+            if not self._is_redis_cluster:
+                pipe.multi()
+            for value in values:
+                pipe.sadd(table, value)
+            pipe.execute()
+
+        else:
+            return self._redis.sadd(table, values)
+
+    def sget(self, table, count=1, is_pop=True):
+        """
+        返回 list 如 ['1'] 或 []
+        @param table:
+        @param count:
+        @param is_pop:
+        @return:
+        """
+
+        datas = []
+        if is_pop:
+            count = count if count <= self.sget_count(table) else self.sget_count(table)
+            if count:
+                if count > 1:
+                    pipe = self._redis.pipeline()
+
+                    if not self._is_redis_cluster:
+                        pipe.multi()
+                    while count:
+                        pipe.spop(table)
+                        count -= 1
+                    datas = pipe.execute()
+
+                else:
+                    datas.append(self._redis.spop(table))
+
+        else:
+            datas = self._redis.srandmember(table, count)
+
+        return datas
+
+    def srem(self, table, values):
+        """
+        @summary: 移除集合中的指定元素
+        ---------
+        @param table:
+        @param values: 一个或者列表
+        ---------
+        @result:
+        """
+
+        if isinstance(values, list):
+            pipe = self._redis.pipeline()
+
+            if not self._is_redis_cluster:
+                pipe.multi()
+            for value in values:
+                pipe.srem(table, value)
+            pipe.execute()
+        else:
+            self._redis.srem(table, values)
+
+    def sget_count(self, table):
+        return self._redis.scard(table)
+
+    def sdelete(self, table):
+        """
+        @summary: 删除set集合的大键(数据量大的表)
+        删除大set键,使用sscan命令,每次扫描集合中500个元素,再用srem命令每次删除一个键
+        若直接用delete命令,会导致Redis阻塞,出现故障切换和应用程序崩溃的故障。
+        ---------
+        @param table:
+        ---------
+        @result:
+        """
+
+        # 当 SCAN 命令的游标参数被设置为 0 时, 服务器将开始一次新的迭代, 而当服务器向用户返回值为 0 的游标时, 表示迭代已结束
+        cursor = "0"
+        while cursor != 0:
+            cursor, data = self._redis.sscan(table, cursor=cursor, count=500)
+            for item in data:
+                # pipe.srem(table, item)
+                self._redis.srem(table, item)
+
+            # pipe.execute()
+
+    def sismember(self, table, key):
+        "Return a boolean indicating if ``value`` is a member of set ``name``"
+        return self._redis.sismember(table, key)
+
+    def zadd(self, table, values, prioritys=0):
+        """
+        @summary: 使用有序set集合存储数据, 去重(值存在更新)
+        ---------
+        @param table:
+        @param values: 值; 支持list 或 单个值
+        @param prioritys: 优先级; double类型,支持list 或 单个值。 根据此字段的值来排序, 值越小越优先。 可不传值,默认value的优先级为0
+        ---------
+        @result:若库中存在 返回0,否则入库,返回1。 批量添加返回 [0, 1 ...]
+        """
+        if isinstance(values, list):
+            if not isinstance(prioritys, list):
+                prioritys = [prioritys] * len(values)
+            else:
+                assert len(values) == len(prioritys), "values值要与prioritys值一一对应"
+
+            pipe = self._redis.pipeline()
+
+            if not self._is_redis_cluster:
+                pipe.multi()
+            for value, priority in zip(values, prioritys):
+                pipe.execute_command(
+                    "ZADD", table, priority, value
+                )  # 为了兼容2.x与3.x版本的redis
+            return pipe.execute()
+
+        else:
+            return self._redis.execute_command(
+                "ZADD", table, prioritys, values
+            )  # 为了兼容2.x与3.x版本的redis
+
+    def zget(self, table, count=1, is_pop=True):
+        """
+        @summary: 从有序set集合中获取数据 优先返回分数小的(优先级高的)
+        ---------
+        @param table:
+        @param count: 数量 -1 返回全部数据
+        @param is_pop:获取数据后,是否在原set集合中删除,默认是
+        ---------
+        @result: 列表
+        """
+
+        start_pos = 0  # 包含
+        end_pos = count - 1 if count > 0 else count
+
+        pipe = self._redis.pipeline()
+
+        if not self._is_redis_cluster:
+            pipe.multi()  # 标记事务的开始 参考 http://www.runoob.com/redis/redis-transactions.html
+        pipe.zrange(table, start_pos, end_pos)  # 取值
+        if is_pop:
+            pipe.zremrangebyrank(table, start_pos, end_pos)  # 删除
+        results, *count = pipe.execute()
+        return results
+
+    def zremrangebyscore(self, table, priority_min, priority_max):
+        """
+        根据分数移除成员 闭区间
+        @param table:
+        @param priority_min:
+        @param priority_max:
+        @return: 被移除的成员个数
+        """
+        return self._redis.zremrangebyscore(table, priority_min, priority_max)
+
+    def zrangebyscore(self, table, priority_min, priority_max, count=None, is_pop=True):
+        """
+        @summary: 返回指定分数区间的数据 闭区间
+        ---------
+        @param table:
+        @param priority_min: 优先级越小越优先
+        @param priority_max:
+        @param count: 获取的数量,为空则表示分数区间内的全部数据
+        @param is_pop: 是否删除
+        ---------
+        @result:
+        """
+
+        # 使用lua脚本, 保证操作的原子性
+        lua = """
+            -- local key = KEYS[1]
+            local min_score = ARGV[2]
+            local max_score = ARGV[3]
+            local is_pop = ARGV[4]
+            local count = ARGV[5]
+
+            -- 取值
+            local datas = nil
+            if count then
+                datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'limit', 0, count)
+            else
+                datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score)
+            end
+
+            -- 删除redis中刚取到的值
+            if (is_pop=='True' or is_pop=='1') then
+                for i=1, #datas do
+                    redis.call('zrem', KEYS[1], datas[i])
+                end
+            end
+
+
+            return datas
+
+        """
+        cmd = self._redis.register_script(lua)
+        if count:
+            res = cmd(
+                keys=[table], args=[table, priority_min, priority_max, is_pop, count]
+            )
+        else:
+            res = cmd(keys=[table], args=[table, priority_min, priority_max, is_pop])
+
+        return res
+
+    def zrangebyscore_increase_score(
+        self, table, priority_min, priority_max, increase_score, count=None
+    ):
+        """
+        @summary: 返回指定分数区间的数据 闭区间, 同时修改分数
+        ---------
+        @param table:
+        @param priority_min: 最小分数
+        @param priority_max: 最大分数
+        @param increase_score: 分数值增量 正数则在原有的分数上叠加,负数则相减
+        @param count: 获取的数量,为空则表示分数区间内的全部数据
+        ---------
+        @result:
+        """
+
+        # 使用lua脚本, 保证操作的原子性
+        lua = """
+            -- local key = KEYS[1]
+            local min_score = ARGV[1]
+            local max_score = ARGV[2]
+            local increase_score = ARGV[3]
+            local count = ARGV[4]
+
+            -- 取值
+            local datas = nil
+            if count then
+                datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'limit', 0, count)
+            else
+                datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score)
+            end
+
+            --修改优先级
+            for i=1, #datas do
+                redis.call('zincrby', KEYS[1], increase_score, datas[i])
+            end
+
+            return datas
+
+        """
+        cmd = self._redis.register_script(lua)
+        if count:
+            res = cmd(
+                keys=[table], args=[priority_min, priority_max, increase_score, count]
+            )
+        else:
+            res = cmd(keys=[table], args=[priority_min, priority_max, increase_score])
+
+        return res
+
+    def zrangebyscore_set_score(
+        self, table, priority_min, priority_max, score, count=None
+    ):
+        """
+        @summary: 返回指定分数区间的数据 闭区间, 同时修改分数
+        ---------
+        @param table:
+        @param priority_min: 最小分数
+        @param priority_max: 最大分数
+        @param score: 分数值
+        @param count: 获取的数量,为空则表示分数区间内的全部数据
+        ---------
+        @result:
+        """
+
+        # 使用lua脚本, 保证操作的原子性
+        lua = """
+            -- local key = KEYS[1]
+            local min_score = ARGV[1]
+            local max_score = ARGV[2]
+            local set_score = ARGV[3]
+            local count = ARGV[4]
+
+            -- 取值
+            local datas = nil
+            if count then
+                datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores','limit', 0, count)
+            else
+                datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores')
+            end
+
+            local real_datas = {} -- 数据
+            --修改优先级
+            for i=1, #datas, 2 do
+               local data = datas[i]
+               local score = datas[i+1]
+
+               table.insert(real_datas, data) -- 添加数据
+
+               redis.call('zincrby', KEYS[1], set_score - score, datas[i])
+            end
+
+            return real_datas
+
+        """
+        cmd = self._redis.register_script(lua)
+        if count:
+            res = cmd(keys=[table], args=[priority_min, priority_max, score, count])
+        else:
+            res = cmd(keys=[table], args=[priority_min, priority_max, score])
+
+        return res
+
+    def zincrby(self, table, amount, value):
+        return self._redis.zincrby(table, amount, value)
+
+    def zget_count(self, table, priority_min=None, priority_max=None):
+        """
+        @summary: 获取表数据的数量
+        ---------
+        @param table:
+        @param priority_min:优先级范围 最小值(包含)
+        @param priority_max:优先级范围 最大值(包含)
+        ---------
+        @result:
+        """
+
+        if priority_min != None and priority_max != None:
+            return self._redis.zcount(table, priority_min, priority_max)
+        else:
+            return self._redis.zcard(table)
+
+    def zrem(self, table, values):
+        """
+        @summary: 移除集合中的指定元素
+        ---------
+        @param table:
+        @param values: 一个或者列表
+        ---------
+        @result:
+        """
+
+        if isinstance(values, list):
+            self._redis.zrem(table, *values)
+        else:
+            self._redis.zrem(table, values)
+
+    def zexists(self, table, values):
+        """
+        利用zscore判断某元素是否存在
+        @param values:
+        @return:
+        """
+
+        is_exists = []
+
+        if isinstance(values, list):
+            pipe = self._redis.pipeline()
+            pipe.multi()
+            for value in values:
+                pipe.zscore(table, value)
+            is_exists_temp = pipe.execute()
+            for is_exist in is_exists_temp:
+                if is_exist != None:
+                    is_exists.append(1)
+                else:
+                    is_exists.append(0)
+
+        else:
+            is_exists = self._redis.zscore(table, values)
+            is_exists = 1 if is_exists != None else 0
+
+        return is_exists
+
+    def lpush(self, table, values):
+        if isinstance(values, list):
+            pipe = self._redis.pipeline()
+
+            if not self._is_redis_cluster:
+                pipe.multi()
+            for value in values:
+                pipe.lpush(table, value)
+            pipe.execute()
+
+        else:
+            return self._redis.lpush(table, values)
+
+    def lpop(self, table, count=1):
+        """
+        @summary:
+        ---------
+        @param table:
+        @param count:
+        ---------
+        @result: count>1时返回列表
+        """
+
+        datas = None
+        lcount = self.lget_count(table)
+        count = count if count <= lcount else lcount
+
+        if count:
+            if count > 1:
+                pipe = self._redis.pipeline()
+
+                if not self._is_redis_cluster:
+                    pipe.multi()
+                while count:
+                    pipe.lpop(table)
+                    count -= 1
+                datas = pipe.execute()
+
+            else:
+                datas = self._redis.lpop(table)
+
+        return datas
+
+    def rpoplpush(self, from_table, to_table=None):
+        """
+        将列表 from_table 中的最后一个元素(尾元素)弹出,并返回给客户端。
+        将 from_table 弹出的元素插入到列表 to_table ,作为 to_table 列表的的头元素。
+        如果 from_table 和 to_table 相同,则列表中的表尾元素被移动到表头,并返回该元素,可以把这种特殊情况视作列表的旋转(rotation)操作
+        @param from_table:
+        @param to_table:
+        @return:
+        """
+
+        if not to_table:
+            to_table = from_table
+
+        return self._redis.rpoplpush(from_table, to_table)
+
+    def lget_count(self, table):
+        return self._redis.llen(table)
+
+    def lrem(self, table, value, num=0):
+        """
+        @summary:
+        删除value
+        ---------
+        @param table:
+        @param value:
+        @param num:
+        ---------
+        @result: 删除的条数
+        """
+        return self._redis.lrem(table, num, value)
+
+    def lrange(self, table, start=0, end=-1):
+        return self._redis.lrange(table, start, end)
+
+    def hset(self, table, key, value):
+        """
+        @summary:
+        如果 key 不存在,一个新的哈希表被创建并进行 HSET 操作。
+        如果域 field 已经存在于哈希表中,旧值将被覆盖
+        ---------
+        @param table:
+        @param key:
+        @param value:
+        ---------
+        @result: 1 新插入; 0 覆盖
+        """
+        return self._redis.hset(table, key, value)
+
+    def hset_batch(self, table, datas):
+        """
+        批量插入
+        Args:
+            datas:
+                [[key, value]]
+        Returns:
+
+        """
+        pipe = self._redis.pipeline()
+
+        if not self._is_redis_cluster:
+            pipe.multi()
+        for key, value in datas:
+            pipe.hset(table, key, value)
+        return pipe.execute()
+
+    def hincrby(self, table, key, increment):
+        return self._redis.hincrby(table, key, increment)
+
+    def hget(self, table, key, is_pop=False):
+        if not is_pop:
+            return self._redis.hget(table, key)
+        else:
+            lua = """
+                -- local key = KEYS[1]
+                local field = ARGV[1]
+
+                -- 取值
+                local datas = redis.call('hget', KEYS[1], field)
+                -- 删除值
+                redis.call('hdel', KEYS[1], field)
+
+                return datas
+
+                    """
+            cmd = self._redis.register_script(lua)
+            res = cmd(keys=[table], args=[key])
+
+            return res
+
+    def hgetall(self, table):
+        return self._redis.hgetall(table)
+
+    def hexists(self, table, key):
+        return self._redis.hexists(table, key)
+
+    def hdel(self, table, *keys):
+        """
+        @summary: 删除对应的key 可传多个
+        ---------
+        @param table:
+        @param *keys:
+        ---------
+        @result:
+        """
+        self._redis.hdel(table, *keys)
+
+    def hget_count(self, table):
+        return self._redis.hlen(table)
+
+    def hkeys(self, table):
+        return self._redis.hkeys(table)
+
+    def hvals(self, key):
+        return self._redis.hvals(key)
+
+    def setbit(
+        self, table, offsets: Union[int, List[int]], values: Union[int, List[int]]
+    ):
+        """
+        设置字符串数组某一位的值,返回之前的值
+        @param table: Redis key
+        @param offsets: 支持列表或单个值
+        @param values: 支持列表或单个值
+        @return: list / 单个值
+        """
+        if isinstance(offsets, list):
+            if isinstance(values, int):
+                # 使用lua脚本,数据是一起传给redis的,降低了网络开销,但redis会阻塞
+                script = """
+                            local value = table.remove(ARGV, 1)
+                            local offsets = ARGV
+                            local results = {}
+                            for i, offset in ipairs(offsets) do
+                                results[i] = redis.call('SETBIT', KEYS[1], offset, value)
+                            end
+                            return results
+                        """
+                return self._redis.eval(script, 1, table, values, *offsets)
+            else:
+                assert len(offsets) == len(values), "offsets值要与values值一一对应"
+                pipe = self._redis.pipeline()
+                pipe.multi()
+
+                for offset, value in zip(offsets, values):
+                    pipe.setbit(table, offset, value)
+
+                return pipe.execute()
+
+        else:
+            return self._redis.setbit(table, offsets, values)
+
+    def getbit(self, table, offsets):
+        """
+        取字符串数组某一位的值
+        @param table:
+        @param offsets: 支持列表
+        @return: list / 单个值
+        """
+        if isinstance(offsets, list):
+            pipe = self._redis.pipeline()
+            pipe.multi()
+            for offset in offsets:
+                pipe.getbit(table, offset)
+
+            return pipe.execute()
+
+        else:
+            return self._redis.getbit(table, offsets)
+
+    def bitcount(self, table):
+        return self._redis.bitcount(table)
+
+    def strset(self, table, value, **kwargs):
+        """
+        设置键值
+        Args:
+            table:
+            value:
+            **kwargs:
+                ex: Union[None, int, timedelta] = ..., 设置键的过期时间为 second 秒
+                px: Union[None, int, timedelta] = ..., 设置键的过期时间为 millisecond 毫秒
+                nx: bool = ..., 只有键不存在时,才对键进行设置操作
+                xx: bool = ..., 只有键已经存在时,才对键进行设置操作
+                keepttl: bool = ..., 保留键的过期时间
+        Returns:
+
+        """
+        return self._redis.set(table, value, **kwargs)
+
+    def str_incrby(self, table, value):
+        return self._redis.incrby(table, value)
+
+    def strget(self, table):
+        return self._redis.get(table)
+
+    def strlen(self, table):
+        return self._redis.strlen(table)
+
+    def str_getkeys(self, regex, count=500):
+        cursor = '0'
+        while True:
+            cursor, keys = self._redis.scan(cursor, match=regex, count=count, _type='STRING')
+            if len(keys) > 0:
+                yield from keys
+
+            if cursor == 0:
+                break
+
+    def str_exists(self, regex, count=500):
+        """
+        @summary: 大量键模糊判重
+        大量键模糊检索,使用scan命令,每次扫描集合中500个元素,
+        若直接用KEYS命令,会导致Redis阻塞,或者返回大量的键,出现数据安全问题等。
+        ---------
+        @param regex: redis模糊查询规则
+        @param count: 每轮扫描条数,默认:500
+        ---------
+        @result:
+        """
+        cursor = '0'
+        while True:
+            cursor, keys = self._redis.scan(cursor, match=regex, count=count, _type='STRING')
+            if len(keys) > 0:
+                return True
+
+            if cursor == 0:
+                break
+
+        return False
+
+    def str_delete(self, regex=None, datas=None, count=500):
+        """
+        @summary: 删除大量键
+        删除大量键,使用scan命令,每次扫描集合中500个元素,再用unlink命令异步批量删除键
+        若直接用delete命令,会导致Redis阻塞,出现故障切换和应用程序崩溃的故障。
+        ---------
+        @param regex: redis模糊查询规则
+        @param count: 每轮扫描条数,默认:500
+        @param datas: 直接使用unlink命令异步删除数据集合
+        ---------
+        @result:
+        """
+        if datas is not None:
+            if isinstance(datas, list) and len(datas) > 0:
+                self._redis.unlink(*datas)
+        else:
+            # 当 SCAN 命令的游标参数被设置为 0 时,服务器将开始一次新的迭代,而当服务器向用户返回值为 0 的游标时,表示迭代已结束
+            batch_size = 500
+            cursor = '0'
+            while True:
+                cursor, keys = self._redis.scan(cursor, match=regex, count=count, _type='STRING')
+                # print(cursor,  len(keys))
+                if len(keys) > 0:
+                    self._redis.unlink(*keys[:batch_size])
+
+                # 如果游标为 0 且仍有键,则再次遍历
+                if cursor == 0 and len(keys) > 0:
+                    cursor = '0'
+                    continue
+
+                # 如果游标为 0 且没有键,则退出循环
+                if cursor == 0 and len(keys) == 0:
+                    break
+
+    def str_count(self, regex, count=500):
+        length = 0
+        for _ in self.str_getkeys(regex, count):
+            length += 1
+
+        # cursor = '0'
+        # while True:
+        #     cursor, keys = self._redis.scan(cursor, match=regex, count=count, _type='STRING')
+        #     if cursor == 0:
+        #         length += len(keys)
+        #         break
+        #
+        #     length += len(keys)
+        return length
+
+    def getkeys(self, regex):
+        return self._redis.keys(regex)
+
+    def exists_key(self, key):
+        return self._redis.exists(key)
+
+    def set_expire(self, key, seconds):
+        """
+        @summary: 设置过期时间
+        ---------
+        @param key:
+        @param seconds: 秒
+        ---------
+        @result:
+        """
+        self._redis.expire(key, seconds)
+
+    def get_expire(self, key):
+        """
+        @summary: 查询过期时间
+        ---------
+        @param key:
+        ---------
+        @result:
+        """
+        return self._redis.ttl(key)
+
+    def clear(self, table):
+        try:
+            self._redis.delete(table)
+        except Exception as e:
+            log.error(e)
+
+    def get_redis_obj(self):
+        return self._redis
+
+    def _reconnect(self):
+        """检测连接状态, 当数据库重启或设置 timeout 导致断开连接时自动重连"""
+        retry_count = 0
+        while True:
+            try:
+                retry_count += 1
+                log.error(f"redis 连接断开, 重新连接 {retry_count}")
+                if self.get_connect():
+                    log.info(f"redis 连接成功")
+                    return True
+            except (ConnectionError, TimeoutError) as e:
+                log.error(f"连接失败 e: {e}")
+
+            time.sleep(2)
+
+    def __getattr__(self, name):
+        return getattr(self._redis, name)

+ 2 - 0
conf/dev.yaml

@@ -15,6 +15,8 @@ redis:
   port: !!int 7361
   pwd: "k5ZJR5KV4q7DRZ92DQ"
   db: !!int 1
+  service_name: null
+  kwargs: {}
 
 redis2:
   host: 172.17.162.28

+ 2 - 0
conf/test.yaml

@@ -15,6 +15,8 @@ redis:
   port: !!int 8165
   pwd: "top@123"
   db: !!int 1
+  service_name: null
+  kwargs: {}
 
 redis2:
   host: 192.168.3.165

+ 3 - 3
services/proxy.py

@@ -154,11 +154,11 @@ class ProxyPoolServer(BaseProxyPool, threading.Thread):
         n1 = 0
         pk = proxy['pk']
         if pk == 1:
-            n1 = self.redis_db.lrem(self.lst_name_1, 0, host)
+            n1 = self.redis_db.lrem(self.lst_name_1, host, 0)
         elif pk == 2:
-            n1 = self.redis_db.lrem(self.lst_name_2, 0, host)
+            n1 = self.redis_db.lrem(self.lst_name_2, host, 0)
 
-        n0 = self.redis_db.lrem(self.lst_name, 0, host)  # 移除所有匹配的元素
+        n0 = self.redis_db.lrem(self.lst_name, host, 0)  # 移除所有匹配的元素
         self.redis_db.hdel(self.tab_name, host)
         logger.info(f"移除代理|{host}|{n1 + n0}")