123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845 |
- # -*- coding: utf-8 -*-
- """
- Created on 2016-11-16 16:25
- ---------
- @summary: 操作redis数据库
- ---------
- @author: Boris
- """
- import math
- import time
- import uuid
- import redis
- from loguru import logger
- from redis._compat import unicode, long, basestring
- from redis.connection import Encoder as _Encoder
- from redis.exceptions import ConnectionError, TimeoutError
- from redis.exceptions import DataError
- import setting
- # 获取锁(乐观锁)
- def acquire_lock_with_timeout(conn, lockname, acquire_timeout=5, lock_timeout=10):
- # 128位随机标识符
- identifier = str(uuid.uuid4())
- lockname = 'lock:' + lockname
- lock_timeout = int(math.ceil(lock_timeout)) # 确保传给exprie是整数
- end = time.time() + acquire_timeout
- while time.time() < end:
- if conn.setnx(lockname, identifier):
- conn.expire(lockname, lock_timeout)
- return identifier
- elif not conn.ttl(lockname): # 为没有设置超时时间的锁设置超时时间
- conn.expire(lockname, lock_timeout)
- time.sleep(0.001)
- return False
- # 释放锁(乐观锁)
- def release_lock(conn, lockname, identifier):
- lockname = 'lock:' + lockname
- with conn.pipeline(True) as pipe:
- while True:
- try:
- pipe.watch(lockname)
- value = pipe.get(lockname)
- # 判断标志是否相同
- if value is not None and value == identifier:
- pipe.multi()
- pipe.delete(lockname)
- pipe.execute()
- return True
- # 不同则直接退出 return False
- pipe.unwatch()
- break
- except redis.exceptions.WatchError:
- pass
- return False
- class Encoder(_Encoder):
- def encode(self, value):
- "Return a bytestring or bytes-like representation of the value"
- if isinstance(value, (bytes, memoryview)):
- return value
- # elif isinstance(value, bool):
- # # special case bool since it is a subclass of int
- # raise DataError(
- # "Invalid input of type: 'bool'. Convert to a "
- # "bytes, string, int or float first."
- # )
- elif isinstance(value, float):
- value = repr(value).encode()
- elif isinstance(value, (int, long)):
- # python 2 repr() on longs is '123L', so use str() instead
- value = str(value).encode()
- elif isinstance(value, (list, dict, tuple)):
- value = unicode(value)
- elif not isinstance(value, basestring):
- # a value we don't know how to deal with. throw an error
- typename = type(value).__name__
- raise DataError(
- "Invalid input of type: '%s'. Convert to a "
- "bytes, string, int or float first." % typename
- )
- if isinstance(value, unicode):
- value = value.encode(self.encoding, self.encoding_errors)
- return value
- redis.connection.Encoder = Encoder
- class RedisDB:
- def __init__(
- self,
- ip_port=None,
- db=None,
- user_pass=None,
- url=None,
- decode_responses=True,
- max_connections=32,
- **kwargs,
- ):
- """
- redis的封装
- Args:
- ip_port: 如 ip:port
- db:
- user_pass:
- url:
- decode_responses:
- """
- if ip_port is None:
- ip_port = setting.REDISDB_IP_PORT
- if db is None:
- db = setting.REDISDB_DB
- if user_pass is None:
- user_pass = setting.REDISDB_USER_PASS
- self._is_redis_cluster = False
- self.__redis = None
- self._url = url
- self._ip_port = ip_port
- self._db = db
- self._user_pass = user_pass
- self._decode_responses = decode_responses
- self._max_connections = max_connections
- self._kwargs = kwargs
- self.get_connect()
- def __repr__(self):
- if self._url:
- return "<Redisdb url:{}>".format(self._url)
- return "<Redisdb ip_ports: {} db:{} user_pass:{}>".format(
- self._ip_port, self._db, self._user_pass
- )
- @property
- def _redis(self):
- try:
- if not self.__redis.ping():
- raise ConnectionError("unable to connect to redis")
- except:
- self._reconnect()
- return self.__redis
- @_redis.setter
- def _redis(self, val):
- self.__redis = val
- def get_connect(self):
- # 获取数据库连接
- try:
- if not self._url:
- if not self._ip_port:
- raise Exception("未设置 redis 连接信息")
- ip, port = self._ip_port.split(":")
- self._redis = redis.StrictRedis(
- host=ip,
- port=port,
- db=self._db,
- password=self._user_pass,
- decode_responses=self._decode_responses,
- max_connections=self._max_connections,
- **self._kwargs,
- )
- else:
- self._redis = redis.StrictRedis.from_url(
- self._url, decode_responses=self._decode_responses
- )
- except Exception as e:
- raise
- # 不要写成self._redis.ping() 否则循环调用了
- return self.__redis.ping()
- @classmethod
- def from_url(cls, url):
- """
- Args:
- url: redis://[[username]:[password]]@[host]:[port]/[db]
- Returns:
- """
- return cls(url=url)
- def sadd(self, table, values):
- """
- @summary: 使用无序set集合存储数据, 去重
- ---------
- @param table:
- @param values: 值; 支持list 或 单个值
- ---------
- @result: 若库中存在 返回0,否则入库,返回1。 批量添加返回None
- """
- if isinstance(values, list):
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- for value in values:
- pipe.sadd(table, value)
- pipe.execute()
- else:
- return self._redis.sadd(table, values)
- def sget(self, table, count=1, is_pop=True):
- """
- 返回 list 如 ['1'] 或 []
- @param table:
- @param count:
- @param is_pop:
- @return:
- """
- datas = []
- if is_pop:
- count = count if count <= self.sget_count(table) else self.sget_count(table)
- if count:
- if count > 1:
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- while count:
- pipe.spop(table)
- count -= 1
- datas = pipe.execute()
- else:
- datas.append(self._redis.spop(table))
- else:
- datas = self._redis.srandmember(table, count)
- return datas
- def srem(self, table, values):
- """
- @summary: 移除集合中的指定元素
- ---------
- @param table:
- @param values: 一个或者列表
- ---------
- @result:
- """
- if isinstance(values, list):
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- for value in values:
- pipe.srem(table, value)
- pipe.execute()
- else:
- self._redis.srem(table, values)
- def sget_count(self, table):
- return self._redis.scard(table)
- def sdelete(self, table):
- """
- @summary: 删除set集合的大键(数据量大的表)
- 删除大set键,使用sscan命令,每次扫描集合中500个元素,再用srem命令每次删除一个键
- 若直接用delete命令,会导致Redis阻塞,出现故障切换和应用程序崩溃的故障。
- ---------
- @param table:
- ---------
- @result:
- """
- # 当 SCAN 命令的游标参数被设置为 0 时, 服务器将开始一次新的迭代, 而当服务器向用户返回值为 0 的游标时, 表示迭代已结束
- cursor = "0"
- while cursor != 0:
- cursor, data = self._redis.sscan(table, cursor=cursor, count=500)
- for item in data:
- # pipe.srem(table, item)
- self._redis.srem(table, item)
- # pipe.execute()
- def sismember(self, table, key):
- "Return a boolean indicating if ``value`` is a member of set ``name``"
- return self._redis.sismember(table, key)
- def zadd(self, table, values, prioritys=0):
- """
- @summary: 使用有序set集合存储数据, 去重(值存在更新)
- ---------
- @param table:
- @param values: 值; 支持list 或 单个值
- @param prioritys: 优先级; double类型,支持list 或 单个值。 根据此字段的值来排序, 值越小越优先。 可不传值,默认value的优先级为0
- ---------
- @result:若库中存在 返回0,否则入库,返回1。 批量添加返回 [0, 1 ...]
- """
- if isinstance(values, list):
- if not isinstance(prioritys, list):
- prioritys = [prioritys] * len(values)
- else:
- assert len(values) == len(prioritys), "values值要与prioritys值一一对应"
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- for value, priority in zip(values, prioritys):
- pipe.execute_command(
- "ZADD", table, priority, value
- ) # 为了兼容2.x与3.x版本的redis
- return pipe.execute()
- else:
- return self._redis.execute_command(
- "ZADD", table, prioritys, values
- ) # 为了兼容2.x与3.x版本的redis
- def zget(self, table, count=1, is_pop=True):
- """
- @summary: 从有序set集合中获取数据 优先返回分数小的(优先级高的)
- ---------
- @param table:
- @param count: 数量 -1 返回全部数据
- @param is_pop:获取数据后,是否在原set集合中删除,默认是
- ---------
- @result: 列表
- """
- start_pos = 0 # 包含
- end_pos = count - 1 if count > 0 else count
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi() # 标记事务的开始 参考 http://www.runoob.com/redis/redis-transactions.html
- pipe.zrange(table, start_pos, end_pos) # 取值
- if is_pop:
- pipe.zremrangebyrank(table, start_pos, end_pos) # 删除
- results, *count = pipe.execute()
- return results
- def zremrangebyscore(self, table, priority_min, priority_max):
- """
- 根据分数移除成员 闭区间
- @param table:
- @param priority_min:
- @param priority_max:
- @return: 被移除的成员个数
- """
- return self._redis.zremrangebyscore(table, priority_min, priority_max)
- def zrangebyscore(self, table, priority_min, priority_max, count=None, is_pop=True):
- """
- @summary: 返回指定分数区间的数据 闭区间
- ---------
- @param table:
- @param priority_min: 优先级越小越优先
- @param priority_max:
- @param count: 获取的数量,为空则表示分数区间内的全部数据
- @param is_pop: 是否删除
- ---------
- @result:
- """
- # 使用lua脚本, 保证操作的原子性
- lua = """
- -- local key = KEYS[1]
- local min_score = ARGV[2]
- local max_score = ARGV[3]
- local is_pop = ARGV[4]
- local count = ARGV[5]
- -- 取值
- local datas = nil
- if count then
- datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'limit', 0, count)
- else
- datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score)
- end
- -- 删除redis中刚取到的值
- if (is_pop=='True' or is_pop=='1') then
- for i=1, #datas do
- redis.call('zrem', KEYS[1], datas[i])
- end
- end
- return datas
- """
- cmd = self._redis.register_script(lua)
- if count:
- res = cmd(
- keys=[table], args=[table, priority_min, priority_max, is_pop, count]
- )
- else:
- res = cmd(keys=[table], args=[table, priority_min, priority_max, is_pop])
- return res
- def zrangebyscore_increase_score(
- self, table, priority_min, priority_max, increase_score, count=None
- ):
- """
- @summary: 返回指定分数区间的数据 闭区间, 同时修改分数
- ---------
- @param table:
- @param priority_min: 最小分数
- @param priority_max: 最大分数
- @param increase_score: 分数值增量 正数则在原有的分数上叠加,负数则相减
- @param count: 获取的数量,为空则表示分数区间内的全部数据
- ---------
- @result:
- """
- # 使用lua脚本, 保证操作的原子性
- lua = """
- -- local key = KEYS[1]
- local min_score = ARGV[1]
- local max_score = ARGV[2]
- local increase_score = ARGV[3]
- local count = ARGV[4]
- -- 取值
- local datas = nil
- if count then
- datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'limit', 0, count)
- else
- datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score)
- end
- --修改优先级
- for i=1, #datas do
- redis.call('zincrby', KEYS[1], increase_score, datas[i])
- end
- return datas
- """
- cmd = self._redis.register_script(lua)
- if count:
- res = cmd(
- keys=[table], args=[priority_min, priority_max, increase_score, count]
- )
- else:
- res = cmd(keys=[table], args=[priority_min, priority_max, increase_score])
- return res
- def zrangebyscore_set_score(
- self, table, priority_min, priority_max, score, count=None
- ):
- """
- @summary: 返回指定分数区间的数据 闭区间, 同时修改分数
- ---------
- @param table:
- @param priority_min: 最小分数
- @param priority_max: 最大分数
- @param score: 分数值
- @param count: 获取的数量,为空则表示分数区间内的全部数据
- ---------
- @result:
- """
- # 使用lua脚本, 保证操作的原子性
- lua = """
- -- local key = KEYS[1]
- local min_score = ARGV[1]
- local max_score = ARGV[2]
- local set_score = ARGV[3]
- local count = ARGV[4]
- -- 取值
- local datas = nil
- if count then
- datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores','limit', 0, count)
- else
- datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores')
- end
- local real_datas = {} -- 数据
- --修改优先级
- for i=1, #datas, 2 do
- local data = datas[i]
- local score = datas[i+1]
- table.insert(real_datas, data) -- 添加数据
- redis.call('zincrby', KEYS[1], set_score - score, datas[i])
- end
- return real_datas
- """
- cmd = self._redis.register_script(lua)
- if count:
- res = cmd(keys=[table], args=[priority_min, priority_max, score, count])
- else:
- res = cmd(keys=[table], args=[priority_min, priority_max, score])
- return res
- def zincrby(self, table, amount, value):
- return self._redis.zincrby(table, amount, value)
- def zget_count(self, table, priority_min=None, priority_max=None):
- """
- @summary: 获取表数据的数量
- ---------
- @param table:
- @param priority_min:优先级范围 最小值(包含)
- @param priority_max:优先级范围 最大值(包含)
- ---------
- @result:
- """
- if priority_min != None and priority_max != None:
- return self._redis.zcount(table, priority_min, priority_max)
- else:
- return self._redis.zcard(table)
- def zrem(self, table, values):
- """
- @summary: 移除集合中的指定元素
- ---------
- @param table:
- @param values: 一个或者列表
- ---------
- @result:
- """
- if isinstance(values, list):
- self._redis.zrem(table, *values)
- else:
- self._redis.zrem(table, values)
- def zexists(self, table, values):
- """
- 利用zscore判断某元素是否存在
- @param values:
- @return:
- """
- is_exists = []
- if isinstance(values, list):
- pipe = self._redis.pipeline()
- pipe.multi()
- for value in values:
- pipe.zscore(table, value)
- is_exists_temp = pipe.execute()
- for is_exist in is_exists_temp:
- if is_exist != None:
- is_exists.append(1)
- else:
- is_exists.append(0)
- else:
- is_exists = self._redis.zscore(table, values)
- is_exists = 1 if is_exists != None else 0
- return is_exists
- def lpush(self, table, values):
- if isinstance(values, list):
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- for value in values:
- pipe.rpush(table, value)
- pipe.execute()
- else:
- return self._redis.rpush(table, values)
- def lpop(self, table, count=1):
- """
- @summary:
- ---------
- @param table:
- @param count:
- ---------
- @result: count>1时返回列表
- """
- datas = None
- count = count if count <= self.lget_count(table) else self.lget_count(table)
- if count:
- if count > 1:
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- while count:
- pipe.lpop(table)
- count -= 1
- datas = pipe.execute()
- else:
- datas = self._redis.lpop(table)
- return datas
- def rpoplpush(self, from_table, to_table=None):
- """
- 将列表 from_table 中的最后一个元素(尾元素)弹出,并返回给客户端。
- 将 from_table 弹出的元素插入到列表 to_table ,作为 to_table 列表的的头元素。
- 如果 from_table 和 to_table 相同,则列表中的表尾元素被移动到表头,并返回该元素,可以把这种特殊情况视作列表的旋转(rotation)操作
- @param from_table:
- @param to_table:
- @return:
- """
- if not to_table:
- to_table = from_table
- return self._redis.rpoplpush(from_table, to_table)
- def lget_count(self, table):
- return self._redis.llen(table)
- def lrem(self, table, value, num=0):
- """
- @summary:
- 删除value
- ---------
- @param table:
- @param value:
- @param num:
- ---------
- @result: 删除的条数
- """
- return self._redis.lrem(table, num, value)
- def lrange(self, table, start=0, end=-1):
- return self._redis.lrange(table, start, end)
- def hset(self, table, key, value):
- """
- @summary:
- 如果 key 不存在,一个新的哈希表被创建并进行 HSET 操作。
- 如果域 field 已经存在于哈希表中,旧值将被覆盖
- ---------
- @param table:
- @param key:
- @param value:
- ---------
- @result: 1 新插入; 0 覆盖
- """
- return self._redis.hset(table, key, value)
- def hset_batch(self, table, datas):
- """
- 批量插入
- Args:
- datas:
- [[key, value]]
- Returns:
- """
- pipe = self._redis.pipeline()
- if not self._is_redis_cluster:
- pipe.multi()
- for key, value in datas:
- pipe.hset(table, key, value)
- return pipe.execute()
- def hincrby(self, table, key, increment):
- return self._redis.hincrby(table, key, increment)
- def hget(self, table, key, is_pop=False):
- if not is_pop:
- return self._redis.hget(table, key)
- else:
- lua = """
- -- local key = KEYS[1]
- local field = ARGV[1]
- -- 取值
- local datas = redis.call('hget', KEYS[1], field)
- -- 删除值
- redis.call('hdel', KEYS[1], field)
- return datas
- """
- cmd = self._redis.register_script(lua)
- res = cmd(keys=[table], args=[key])
- return res
- def hgetall(self, table):
- return self._redis.hgetall(table)
- def hexists(self, table, key):
- return self._redis.hexists(table, key)
- def hdel(self, table, *keys):
- """
- @summary: 删除对应的key 可传多个
- ---------
- @param table:
- @param *keys:
- ---------
- @result:
- """
- self._redis.hdel(table, *keys)
- def hget_count(self, table):
- return self._redis.hlen(table)
- def setbit(self, table, offsets, values):
- """
- 设置字符串数组某一位的值, 返回之前的值
- @param table:
- @param offsets: 支持列表或单个值
- @param values: 支持列表或单个值
- @return: list / 单个值
- """
- if isinstance(offsets, list):
- if not isinstance(values, list):
- values = [values] * len(offsets)
- else:
- assert len(offsets) == len(values), "offsets值要与values值一一对应"
- pipe = self._redis.pipeline()
- pipe.multi()
- for offset, value in zip(offsets, values):
- pipe.setbit(table, offset, value)
- return pipe.execute()
- else:
- return self._redis.setbit(table, offsets, values)
- def getbit(self, table, offsets):
- """
- 取字符串数组某一位的值
- @param table:
- @param offsets: 支持列表
- @return: list / 单个值
- """
- if isinstance(offsets, list):
- pipe = self._redis.pipeline()
- pipe.multi()
- for offset in offsets:
- pipe.getbit(table, offset)
- return pipe.execute()
- else:
- return self._redis.getbit(table, offsets)
- def bitcount(self, table):
- return self._redis.bitcount(table)
- def strset(self, table, value, **kwargs):
- return self._redis.set(table, value, **kwargs)
- def str_incrby(self, table, value):
- return self._redis.incrby(table, value)
- def strget(self, table):
- return self._redis.get(table)
- def strlen(self, table):
- return self._redis.strlen(table)
- def getkeys(self, regex):
- return self._redis.keys(regex)
- def exists_key(self, key):
- return self._redis.exists(key)
- def set_expire(self, key, seconds):
- """
- @summary: 设置过期时间
- ---------
- @param key:
- @param seconds: 秒
- ---------
- @result:
- """
- self._redis.expire(key, seconds)
- def get_expire(self, key):
- """
- @summary: 查询过期时间
- ---------
- @param key:
- @param seconds: 秒
- ---------
- @result:
- """
- return self._redis.ttl(key)
- def clear(self, table):
- try:
- self._redis.delete(table)
- except Exception as e:
- logger.error(e)
- def get_redis_obj(self):
- return self._redis
- def _reconnect(self):
- # 检测连接状态, 当数据库重启或设置 timeout 导致断开连接时自动重连
- retry_count = 0
- while True:
- try:
- retry_count += 1
- logger.error(f"redis 连接断开, 重新连接 {retry_count}")
- if self.get_connect():
- logger.info(f"redis 连接成功")
- return True
- except (ConnectionError, TimeoutError) as e:
- logger.error(f"连接失败 e: {e}")
- time.sleep(2)
- def __getattr__(self, name):
- return getattr(self._redis, name)
|