123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969 |
- # -*- coding: utf-8 -*-
- """
- Created on 2016-11-16 16:25
- ---------
- @summary: 操作redis数据库
- ---------
- @author: Boris
- """
- import os
- import time
- from typing import Union, List
- import redis
- from redis.connection import Encoder as _Encoder
- from redis.exceptions import ConnectionError, TimeoutError
- from redis.exceptions import DataError
- from redis.sentinel import Sentinel
- import setting
- from common.log import logger as log
- class Encoder(_Encoder):
- def encode(self, value):
- "Return a bytestring or bytes-like representation of the value"
- if isinstance(value, (bytes, memoryview)):
- return value
- # elif isinstance(value, bool):
- # # special case bool since it is a subclass of int
- # raise DataError(
- # "Invalid input of type: 'bool'. Convert to a "
- # "bytes, string, int or float first."
- # )
- elif isinstance(value, float):
- value = repr(value).encode()
- elif isinstance(value, int):
- # python 2 repr() on longs is '123L', so use str() instead
- value = str(value).encode()
- elif isinstance(value, (list, dict, tuple)):
- value = str(value)
- elif not isinstance(value, str):
- # a value we don't know how to deal with. throw an error
- typename = type(value).__name__
- raise DataError(
- "Invalid input of type: '%s'. Convert to a "
- "bytes, string, int or float first." % typename
- )
- if isinstance(value, str):
- value = value.encode(self.encoding, self.encoding_errors)
- return value
- redis.connection.Encoder = Encoder
- class RedisDB:
- def __init__(
- self,
- ip_ports=None,
- db=None,
- user_pass=None,
- url=None,
- decode_responses=True,
- service_name=None,
- max_connections=1000,
- **kwargs,
- ):
- """
- redis的封装
- Args:
- ip_ports: ip:port 多个可写为列表或者逗号隔开 如 ip1:port1,ip2:port2 或 ["ip1:port1", "ip2:port2"]
- db:
- user_pass:
- url:
- decode_responses:
- service_name: 适用于redis哨兵模式
- max_connections: 同一个redis对象使用的并发数(连接池的最大连接数),超过这个数量会抛出redis.ConnectionError
- """
- # 可能会改setting中的值,所以此处不能直接赋值为默认值,需要后加载赋值
- if ip_ports is None:
- ip = setting.redis_conf['host']
- port = setting.redis_conf['port']
- ip_ports = f'{ip}:{port}'
- if db is None:
- db = setting.redis_conf['db']
- if user_pass is None:
- user_pass = setting.redis_conf['pwd']
- if service_name is None:
- service_name = setting.redis_conf['service_name']
- if kwargs is None:
- kwargs = setting.redis_conf['redis_kwargs']
- self._is_redis_cluster = False
- self.__redis = None
- self._url = url
- self._ip_ports = ip_ports
- self._db = db
- self._user_pass = user_pass
- self._decode_responses = decode_responses
- self._service_name = service_name
- self._max_connections = max_connections
- self._kwargs = kwargs
- self.get_connect()
- def __repr__(self):
- if self._url:
- return "<Redisdb url:{}>".format(self._url)
- return "<Redisdb ip_ports: {} db:{} user_pass:{}>".format(
- self._ip_ports, self._db, self._user_pass
- )
- @property
- def _redis(self):
- try:
- if not self.__redis.ping():
- raise ConnectionError("unable to connect to redis")
- except:
- self._reconnect()
- return self.__redis
- @_redis.setter
- def _redis(self, val):
- self.__redis = val
- def get_connect(self):
- # 获取数据库连接
- try:
- if not self._url:
- if not self._ip_ports:
- raise ConnectionError("未设置 redis 连接信息")
- ip_ports = (
- self._ip_ports
- if isinstance(self._ip_ports, list)
- else self._ip_ports.split(",")
- )
- if len(ip_ports) > 1:
- startup_nodes = []
- for ip_port in ip_ports:
- ip, port = ip_port.split(":")
- startup_nodes.append({"host": ip, "port": port})
- if self._service_name:
- # log.debug("使用redis哨兵模式")
- hosts = [(node["host"], node["port"]) for node in startup_nodes]
- sentinel = Sentinel(hosts, socket_timeout=3, **self._kwargs)
- self._redis = sentinel.master_for(
- self._service_name,
- password=self._user_pass,
- db=self._db,
- redis_class=redis.StrictRedis,
- decode_responses=self._decode_responses,
- max_connections=self._max_connections,
- **self._kwargs,
- )
- else:
- try:
- from rediscluster import RedisCluster
- except ModuleNotFoundError as e:
- log.error('请安装 pip install "feapder[all]"')
- os._exit(0)
- # log.debug("使用redis集群模式")
- self._redis = RedisCluster(
- startup_nodes=startup_nodes,
- decode_responses=self._decode_responses,
- password=self._user_pass,
- max_connections=self._max_connections,
- **self._kwargs,
- )
- self._is_redis_cluster = True
- else:
- ip, port = ip_ports[0].split(":")
- self._redis = redis.StrictRedis(
- host=ip,
- port=port,
- db=self._db,
- password=self._user_pass,
- decode_responses=self._decode_responses,
- max_connections=self._max_connections,
- **self._kwargs,
- )
- self._is_redis_cluster = False
- else:
- self._redis = redis.StrictRedis.from_url(
- self._url, decode_responses=self._decode_responses, **self._kwargs
- )
- self._is_redis_cluster = False
- except Exception as e:
- raise e
- # 不要写成self._redis.ping() 否则循环调用了
- return self.__redis.ping()
- @classmethod
- def from_url(cls, url):
- """
- Args:
- url: redis://[[username]:[password]]@[host]:[port]/[db]
- Returns:
- """
- return cls(url=url)
- def sadd(self, table, values):
- """
- @summary: 使用无序set集合存储数据, 去重
- ---------
- @param table:
- @param values: 值; 支持list 或 单个值
- ---------
- @result: 若库中存在 返回0,否则入库,返回1。 批量添加返回None
- """
- if isinstance(values, list):
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- for value in values:
- pipe.sadd(table, value)
- pipe.execute()
- else:
- return self._redis.sadd(table, values)
- def sget(self, table, count=1, is_pop=True):
- """
- 返回 list 如 ['1'] 或 []
- @param table:
- @param count:
- @param is_pop:
- @return:
- """
- datas = []
- if is_pop:
- count = count if count <= self.sget_count(table) else self.sget_count(table)
- if count:
- if count > 1:
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- while count:
- pipe.spop(table)
- count -= 1
- datas = pipe.execute()
- else:
- datas.append(self._redis.spop(table))
- else:
- datas = self._redis.srandmember(table, count)
- return datas
- def srem(self, table, values):
- """
- @summary: 移除集合中的指定元素
- ---------
- @param table:
- @param values: 一个或者列表
- ---------
- @result:
- """
- if isinstance(values, list):
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- for value in values:
- pipe.srem(table, value)
- pipe.execute()
- else:
- self._redis.srem(table, values)
- def sget_count(self, table):
- return self._redis.scard(table)
- def sdelete(self, table):
- """
- @summary: 删除set集合的大键(数据量大的表)
- 删除大set键,使用sscan命令,每次扫描集合中500个元素,再用srem命令每次删除一个键
- 若直接用delete命令,会导致Redis阻塞,出现故障切换和应用程序崩溃的故障。
- ---------
- @param table:
- ---------
- @result:
- """
- # 当 SCAN 命令的游标参数被设置为 0 时, 服务器将开始一次新的迭代, 而当服务器向用户返回值为 0 的游标时, 表示迭代已结束
- cursor = "0"
- while cursor != 0:
- cursor, data = self._redis.sscan(table, cursor=cursor, count=500)
- for item in data:
- # pipe.srem(table, item)
- self._redis.srem(table, item)
- # pipe.execute()
- def sismember(self, table, key):
- "Return a boolean indicating if ``value`` is a member of set ``name``"
- return self._redis.sismember(table, key)
- def zadd(self, table, values, prioritys=0):
- """
- @summary: 使用有序set集合存储数据, 去重(值存在更新)
- ---------
- @param table:
- @param values: 值; 支持list 或 单个值
- @param prioritys: 优先级; double类型,支持list 或 单个值。 根据此字段的值来排序, 值越小越优先。 可不传值,默认value的优先级为0
- ---------
- @result:若库中存在 返回0,否则入库,返回1。 批量添加返回 [0, 1 ...]
- """
- if isinstance(values, list):
- if not isinstance(prioritys, list):
- prioritys = [prioritys] * len(values)
- else:
- assert len(values) == len(prioritys), "values值要与prioritys值一一对应"
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- for value, priority in zip(values, prioritys):
- pipe.execute_command(
- "ZADD", table, priority, value
- ) # 为了兼容2.x与3.x版本的redis
- return pipe.execute()
- else:
- return self._redis.execute_command(
- "ZADD", table, prioritys, values
- ) # 为了兼容2.x与3.x版本的redis
- def zget(self, table, count=1, is_pop=True):
- """
- @summary: 从有序set集合中获取数据 优先返回分数小的(优先级高的)
- ---------
- @param table:
- @param count: 数量 -1 返回全部数据
- @param is_pop:获取数据后,是否在原set集合中删除,默认是
- ---------
- @result: 列表
- """
- start_pos = 0 # 包含
- end_pos = count - 1 if count > 0 else count
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi() # 标记事务的开始 参考 http://www.runoob.com/redis/redis-transactions.html
- pipe.zrange(table, start_pos, end_pos) # 取值
- if is_pop:
- pipe.zremrangebyrank(table, start_pos, end_pos) # 删除
- results, *count = pipe.execute()
- return results
- def zremrangebyscore(self, table, priority_min, priority_max):
- """
- 根据分数移除成员 闭区间
- @param table:
- @param priority_min:
- @param priority_max:
- @return: 被移除的成员个数
- """
- return self._redis.zremrangebyscore(table, priority_min, priority_max)
- def zrangebyscore(self, table, priority_min, priority_max, count=None, is_pop=True):
- """
- @summary: 返回指定分数区间的数据 闭区间
- ---------
- @param table:
- @param priority_min: 优先级越小越优先
- @param priority_max:
- @param count: 获取的数量,为空则表示分数区间内的全部数据
- @param is_pop: 是否删除
- ---------
- @result:
- """
- # 使用lua脚本, 保证操作的原子性
- lua = """
- -- local key = KEYS[1]
- local min_score = ARGV[2]
- local max_score = ARGV[3]
- local is_pop = ARGV[4]
- local count = ARGV[5]
- -- 取值
- local datas = nil
- if count then
- datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'limit', 0, count)
- else
- datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score)
- end
- -- 删除redis中刚取到的值
- if (is_pop=='True' or is_pop=='1') then
- for i=1, #datas do
- redis.call('zrem', KEYS[1], datas[i])
- end
- end
- return datas
- """
- cmd = self._redis.register_script(lua)
- if count:
- res = cmd(
- keys=[table], args=[table, priority_min, priority_max, is_pop, count]
- )
- else:
- res = cmd(keys=[table], args=[table, priority_min, priority_max, is_pop])
- return res
- def zrangebyscore_increase_score(
- self, table, priority_min, priority_max, increase_score, count=None
- ):
- """
- @summary: 返回指定分数区间的数据 闭区间, 同时修改分数
- ---------
- @param table:
- @param priority_min: 最小分数
- @param priority_max: 最大分数
- @param increase_score: 分数值增量 正数则在原有的分数上叠加,负数则相减
- @param count: 获取的数量,为空则表示分数区间内的全部数据
- ---------
- @result:
- """
- # 使用lua脚本, 保证操作的原子性
- lua = """
- -- local key = KEYS[1]
- local min_score = ARGV[1]
- local max_score = ARGV[2]
- local increase_score = ARGV[3]
- local count = ARGV[4]
- -- 取值
- local datas = nil
- if count then
- datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'limit', 0, count)
- else
- datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score)
- end
- --修改优先级
- for i=1, #datas do
- redis.call('zincrby', KEYS[1], increase_score, datas[i])
- end
- return datas
- """
- cmd = self._redis.register_script(lua)
- if count:
- res = cmd(
- keys=[table], args=[priority_min, priority_max, increase_score, count]
- )
- else:
- res = cmd(keys=[table], args=[priority_min, priority_max, increase_score])
- return res
- def zrangebyscore_set_score(
- self, table, priority_min, priority_max, score, count=None
- ):
- """
- @summary: 返回指定分数区间的数据 闭区间, 同时修改分数
- ---------
- @param table:
- @param priority_min: 最小分数
- @param priority_max: 最大分数
- @param score: 分数值
- @param count: 获取的数量,为空则表示分数区间内的全部数据
- ---------
- @result:
- """
- # 使用lua脚本, 保证操作的原子性
- lua = """
- -- local key = KEYS[1]
- local min_score = ARGV[1]
- local max_score = ARGV[2]
- local set_score = ARGV[3]
- local count = ARGV[4]
- -- 取值
- local datas = nil
- if count then
- datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores','limit', 0, count)
- else
- datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores')
- end
- local real_datas = {} -- 数据
- --修改优先级
- for i=1, #datas, 2 do
- local data = datas[i]
- local score = datas[i+1]
- table.insert(real_datas, data) -- 添加数据
- redis.call('zincrby', KEYS[1], set_score - score, datas[i])
- end
- return real_datas
- """
- cmd = self._redis.register_script(lua)
- if count:
- res = cmd(keys=[table], args=[priority_min, priority_max, score, count])
- else:
- res = cmd(keys=[table], args=[priority_min, priority_max, score])
- return res
- def zincrby(self, table, amount, value):
- return self._redis.zincrby(table, amount, value)
- def zget_count(self, table, priority_min=None, priority_max=None):
- """
- @summary: 获取表数据的数量
- ---------
- @param table:
- @param priority_min:优先级范围 最小值(包含)
- @param priority_max:优先级范围 最大值(包含)
- ---------
- @result:
- """
- if priority_min != None and priority_max != None:
- return self._redis.zcount(table, priority_min, priority_max)
- else:
- return self._redis.zcard(table)
- def zrem(self, table, values):
- """
- @summary: 移除集合中的指定元素
- ---------
- @param table:
- @param values: 一个或者列表
- ---------
- @result:
- """
- if isinstance(values, list):
- self._redis.zrem(table, *values)
- else:
- self._redis.zrem(table, values)
- def zexists(self, table, values):
- """
- 利用zscore判断某元素是否存在
- @param values:
- @return:
- """
- is_exists = []
- if isinstance(values, list):
- pipe = self._redis.pipeline()
- pipe.multi()
- for value in values:
- pipe.zscore(table, value)
- is_exists_temp = pipe.execute()
- for is_exist in is_exists_temp:
- if is_exist != None:
- is_exists.append(1)
- else:
- is_exists.append(0)
- else:
- is_exists = self._redis.zscore(table, values)
- is_exists = 1 if is_exists != None else 0
- return is_exists
- def lpush(self, table, values):
- if isinstance(values, list):
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- for value in values:
- pipe.lpush(table, value)
- pipe.execute()
- else:
- return self._redis.lpush(table, values)
- def lpop(self, table, count=1):
- """
- @summary:
- ---------
- @param table:
- @param count:
- ---------
- @result: count>1时返回列表
- """
- datas = None
- lcount = self.lget_count(table)
- count = count if count <= lcount else lcount
- if count:
- if count > 1:
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- while count:
- pipe.lpop(table)
- count -= 1
- datas = pipe.execute()
- else:
- datas = self._redis.lpop(table)
- return datas
- def rpoplpush(self, from_table, to_table=None):
- """
- 将列表 from_table 中的最后一个元素(尾元素)弹出,并返回给客户端。
- 将 from_table 弹出的元素插入到列表 to_table ,作为 to_table 列表的的头元素。
- 如果 from_table 和 to_table 相同,则列表中的表尾元素被移动到表头,并返回该元素,可以把这种特殊情况视作列表的旋转(rotation)操作
- @param from_table:
- @param to_table:
- @return:
- """
- if not to_table:
- to_table = from_table
- return self._redis.rpoplpush(from_table, to_table)
- def lget_count(self, table):
- return self._redis.llen(table)
- def lrem(self, table, value, num=0):
- """
- @summary:
- 删除value
- ---------
- @param table:
- @param value:
- @param num:
- ---------
- @result: 删除的条数
- """
- return self._redis.lrem(table, num, value)
- def lrange(self, table, start=0, end=-1):
- return self._redis.lrange(table, start, end)
- def hset(self, table, key, value):
- """
- @summary:
- 如果 key 不存在,一个新的哈希表被创建并进行 HSET 操作。
- 如果域 field 已经存在于哈希表中,旧值将被覆盖
- ---------
- @param table:
- @param key:
- @param value:
- ---------
- @result: 1 新插入; 0 覆盖
- """
- return self._redis.hset(table, key, value)
- def hset_batch(self, table, datas):
- """
- 批量插入
- Args:
- datas:
- [[key, value]]
- Returns:
- """
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- for key, value in datas:
- pipe.hset(table, key, value)
- return pipe.execute()
- def hincrby(self, table, key, increment):
- return self._redis.hincrby(table, key, increment)
- def hget(self, table, key, is_pop=False):
- if not is_pop:
- return self._redis.hget(table, key)
- else:
- lua = """
- -- local key = KEYS[1]
- local field = ARGV[1]
- -- 取值
- local datas = redis.call('hget', KEYS[1], field)
- -- 删除值
- redis.call('hdel', KEYS[1], field)
- return datas
- """
- cmd = self._redis.register_script(lua)
- res = cmd(keys=[table], args=[key])
- return res
- def hgetall(self, table):
- return self._redis.hgetall(table)
- def hexists(self, table, key):
- return self._redis.hexists(table, key)
- def hdel(self, table, *keys):
- """
- @summary: 删除对应的key 可传多个
- ---------
- @param table:
- @param *keys:
- ---------
- @result:
- """
- self._redis.hdel(table, *keys)
- def hget_count(self, table):
- return self._redis.hlen(table)
- def hkeys(self, table):
- return self._redis.hkeys(table)
- def hvals(self, key):
- return self._redis.hvals(key)
- def setbit(
- self, table, offsets: Union[int, List[int]], values: Union[int, List[int]]
- ):
- """
- 设置字符串数组某一位的值,返回之前的值
- @param table: Redis key
- @param offsets: 支持列表或单个值
- @param values: 支持列表或单个值
- @return: list / 单个值
- """
- if isinstance(offsets, list):
- if isinstance(values, int):
- # 使用lua脚本,数据是一起传给redis的,降低了网络开销,但redis会阻塞
- script = """
- local value = table.remove(ARGV, 1)
- local offsets = ARGV
- local results = {}
- for i, offset in ipairs(offsets) do
- results[i] = redis.call('SETBIT', KEYS[1], offset, value)
- end
- return results
- """
- return self._redis.eval(script, 1, table, values, *offsets)
- else:
- assert len(offsets) == len(values), "offsets值要与values值一一对应"
- pipe = self._redis.pipeline()
- pipe.multi()
- for offset, value in zip(offsets, values):
- pipe.setbit(table, offset, value)
- return pipe.execute()
- else:
- return self._redis.setbit(table, offsets, values)
- def getbit(self, table, offsets):
- """
- 取字符串数组某一位的值
- @param table:
- @param offsets: 支持列表
- @return: list / 单个值
- """
- if isinstance(offsets, list):
- pipe = self._redis.pipeline()
- pipe.multi()
- for offset in offsets:
- pipe.getbit(table, offset)
- return pipe.execute()
- else:
- return self._redis.getbit(table, offsets)
- def bitcount(self, table):
- return self._redis.bitcount(table)
- def strset(self, table, value, **kwargs):
- """
- 设置键值
- Args:
- table:
- value:
- **kwargs:
- ex: Union[None, int, timedelta] = ..., 设置键的过期时间为 second 秒
- px: Union[None, int, timedelta] = ..., 设置键的过期时间为 millisecond 毫秒
- nx: bool = ..., 只有键不存在时,才对键进行设置操作
- xx: bool = ..., 只有键已经存在时,才对键进行设置操作
- keepttl: bool = ..., 保留键的过期时间
- Returns:
- """
- return self._redis.set(table, value, **kwargs)
- def str_incrby(self, table, value):
- return self._redis.incrby(table, value)
- def strget(self, table):
- return self._redis.get(table)
- def strlen(self, table):
- return self._redis.strlen(table)
- def str_getkeys(self, regex, count=500):
- cursor = '0'
- while True:
- cursor, keys = self._redis.scan(cursor, match=regex, count=count, _type='STRING')
- if len(keys) > 0:
- yield from keys
- if cursor == 0:
- break
- def str_exists(self, regex, count=500):
- """
- @summary: 大量键模糊判重
- 大量键模糊检索,使用scan命令,每次扫描集合中500个元素,
- 若直接用KEYS命令,会导致Redis阻塞,或者返回大量的键,出现数据安全问题等。
- ---------
- @param regex: redis模糊查询规则
- @param count: 每轮扫描条数,默认:500
- ---------
- @result:
- """
- cursor = '0'
- while True:
- cursor, keys = self._redis.scan(cursor, match=regex, count=count, _type='STRING')
- if len(keys) > 0:
- return True
- if cursor == 0:
- break
- return False
- def str_delete(self, regex=None, datas=None, count=500):
- """
- @summary: 删除大量键
- 删除大量键,使用scan命令,每次扫描集合中500个元素,再用unlink命令异步批量删除键
- 若直接用delete命令,会导致Redis阻塞,出现故障切换和应用程序崩溃的故障。
- ---------
- @param regex: redis模糊查询规则
- @param count: 每轮扫描条数,默认:500
- @param datas: 直接使用unlink命令异步删除数据集合
- ---------
- @result:
- """
- if datas is not None:
- if isinstance(datas, list) and len(datas) > 0:
- self._redis.unlink(*datas)
- else:
- # 当 SCAN 命令的游标参数被设置为 0 时,服务器将开始一次新的迭代,而当服务器向用户返回值为 0 的游标时,表示迭代已结束
- batch_size = 500
- cursor = '0'
- while True:
- cursor, keys = self._redis.scan(cursor, match=regex, count=count, _type='STRING')
- # print(cursor, len(keys))
- if len(keys) > 0:
- self._redis.unlink(*keys[:batch_size])
- # 如果游标为 0 且仍有键,则再次遍历
- if cursor == 0 and len(keys) > 0:
- cursor = '0'
- continue
- # 如果游标为 0 且没有键,则退出循环
- if cursor == 0 and len(keys) == 0:
- break
- def str_count(self, regex, count=500):
- length = 0
- for _ in self.str_getkeys(regex, count):
- length += 1
- # cursor = '0'
- # while True:
- # cursor, keys = self._redis.scan(cursor, match=regex, count=count, _type='STRING')
- # if cursor == 0:
- # length += len(keys)
- # break
- #
- # length += len(keys)
- return length
- def getkeys(self, regex):
- return self._redis.keys(regex)
- def exists_key(self, key):
- return self._redis.exists(key)
- def set_expire(self, key, seconds):
- """
- @summary: 设置过期时间
- ---------
- @param key:
- @param seconds: 秒
- ---------
- @result:
- """
- self._redis.expire(key, seconds)
- def get_expire(self, key):
- """
- @summary: 查询过期时间
- ---------
- @param key:
- ---------
- @result:
- """
- return self._redis.ttl(key)
- def clear(self, table):
- try:
- self._redis.delete(table)
- except Exception as e:
- log.error(e)
- def get_redis_obj(self):
- return self._redis
- def _reconnect(self):
- """检测连接状态, 当数据库重启或设置 timeout 导致断开连接时自动重连"""
- retry_count = 0
- while True:
- try:
- retry_count += 1
- log.error(f"redis 连接断开, 重新连接 {retry_count}")
- if self.get_connect():
- log.info(f"redis 连接成功")
- return True
- except (ConnectionError, TimeoutError) as e:
- log.error(f"连接失败 e: {e}")
- time.sleep(2)
- def __getattr__(self, name):
- return getattr(self._redis, name)
|