redisdb.py 25 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2016-11-16 16:25
  4. ---------
  5. @summary: 操作redis数据库
  6. ---------
  7. @author: Boris
  8. """
  9. import math
  10. import time
  11. import uuid
  12. import redis
  13. from loguru import logger
  14. from redis._compat import unicode, long, basestring
  15. from redis.connection import Encoder as _Encoder
  16. from redis.exceptions import ConnectionError, TimeoutError
  17. from redis.exceptions import DataError
  18. import setting
  19. # 获取锁(乐观锁)
  20. def acquire_lock_with_timeout(conn, lockname, acquire_timeout=5, lock_timeout=10):
  21. # 128位随机标识符
  22. identifier = str(uuid.uuid4())
  23. lockname = 'lock:' + lockname
  24. lock_timeout = int(math.ceil(lock_timeout)) # 确保传给exprie是整数
  25. end = time.time() + acquire_timeout
  26. while time.time() < end:
  27. if conn.setnx(lockname, identifier):
  28. conn.expire(lockname, lock_timeout)
  29. return identifier
  30. elif not conn.ttl(lockname): # 为没有设置超时时间的锁设置超时时间
  31. conn.expire(lockname, lock_timeout)
  32. time.sleep(0.001)
  33. return False
  34. # 释放锁(乐观锁)
  35. def release_lock(conn, lockname, identifier):
  36. lockname = 'lock:' + lockname
  37. with conn.pipeline(True) as pipe:
  38. while True:
  39. try:
  40. pipe.watch(lockname)
  41. value = pipe.get(lockname)
  42. # 判断标志是否相同
  43. if value is not None and value == identifier:
  44. pipe.multi()
  45. pipe.delete(lockname)
  46. pipe.execute()
  47. return True
  48. # 不同则直接退出 return False
  49. pipe.unwatch()
  50. break
  51. except redis.exceptions.WatchError:
  52. pass
  53. return False
  54. class Encoder(_Encoder):
  55. def encode(self, value):
  56. "Return a bytestring or bytes-like representation of the value"
  57. if isinstance(value, (bytes, memoryview)):
  58. return value
  59. # elif isinstance(value, bool):
  60. # # special case bool since it is a subclass of int
  61. # raise DataError(
  62. # "Invalid input of type: 'bool'. Convert to a "
  63. # "bytes, string, int or float first."
  64. # )
  65. elif isinstance(value, float):
  66. value = repr(value).encode()
  67. elif isinstance(value, (int, long)):
  68. # python 2 repr() on longs is '123L', so use str() instead
  69. value = str(value).encode()
  70. elif isinstance(value, (list, dict, tuple)):
  71. value = unicode(value)
  72. elif not isinstance(value, basestring):
  73. # a value we don't know how to deal with. throw an error
  74. typename = type(value).__name__
  75. raise DataError(
  76. "Invalid input of type: '%s'. Convert to a "
  77. "bytes, string, int or float first." % typename
  78. )
  79. if isinstance(value, unicode):
  80. value = value.encode(self.encoding, self.encoding_errors)
  81. return value
  82. redis.connection.Encoder = Encoder
  83. class RedisDB:
  84. def __init__(
  85. self,
  86. ip_port=None,
  87. db=None,
  88. user_pass=None,
  89. url=None,
  90. decode_responses=True,
  91. max_connections=32,
  92. **kwargs,
  93. ):
  94. """
  95. redis的封装
  96. Args:
  97. ip_port: 如 ip:port
  98. db:
  99. user_pass:
  100. url:
  101. decode_responses:
  102. """
  103. if ip_port is None:
  104. ip_port = setting.REDISDB_IP_PORT
  105. if db is None:
  106. db = setting.REDISDB_DB
  107. if user_pass is None:
  108. user_pass = setting.REDISDB_USER_PASS
  109. self._is_redis_cluster = False
  110. self.__redis = None
  111. self._url = url
  112. self._ip_port = ip_port
  113. self._db = db
  114. self._user_pass = user_pass
  115. self._decode_responses = decode_responses
  116. self._max_connections = max_connections
  117. self._kwargs = kwargs
  118. self.get_connect()
  119. def __repr__(self):
  120. if self._url:
  121. return "<Redisdb url:{}>".format(self._url)
  122. return "<Redisdb ip_ports: {} db:{} user_pass:{}>".format(
  123. self._ip_port, self._db, self._user_pass
  124. )
  125. @property
  126. def _redis(self):
  127. try:
  128. if not self.__redis.ping():
  129. raise ConnectionError("unable to connect to redis")
  130. except:
  131. self._reconnect()
  132. return self.__redis
  133. @_redis.setter
  134. def _redis(self, val):
  135. self.__redis = val
  136. def get_connect(self):
  137. # 获取数据库连接
  138. try:
  139. if not self._url:
  140. if not self._ip_port:
  141. raise Exception("未设置 redis 连接信息")
  142. ip, port = self._ip_port.split(":")
  143. self._redis = redis.StrictRedis(
  144. host=ip,
  145. port=port,
  146. db=self._db,
  147. password=self._user_pass,
  148. decode_responses=self._decode_responses,
  149. max_connections=self._max_connections,
  150. **self._kwargs,
  151. )
  152. else:
  153. self._redis = redis.StrictRedis.from_url(
  154. self._url, decode_responses=self._decode_responses
  155. )
  156. except Exception as e:
  157. raise
  158. # 不要写成self._redis.ping() 否则循环调用了
  159. return self.__redis.ping()
  160. @classmethod
  161. def from_url(cls, url):
  162. """
  163. Args:
  164. url: redis://[[username]:[password]]@[host]:[port]/[db]
  165. Returns:
  166. """
  167. return cls(url=url)
  168. def sadd(self, table, values):
  169. """
  170. @summary: 使用无序set集合存储数据, 去重
  171. ---------
  172. @param table:
  173. @param values: 值; 支持list 或 单个值
  174. ---------
  175. @result: 若库中存在 返回0,否则入库,返回1。 批量添加返回None
  176. """
  177. if isinstance(values, list):
  178. pipe = self._redis.pipeline()
  179. if not self._is_redis_cluster:
  180. pipe.multi()
  181. for value in values:
  182. pipe.sadd(table, value)
  183. pipe.execute()
  184. else:
  185. return self._redis.sadd(table, values)
  186. def sget(self, table, count=1, is_pop=True):
  187. """
  188. 返回 list 如 ['1'] 或 []
  189. @param table:
  190. @param count:
  191. @param is_pop:
  192. @return:
  193. """
  194. datas = []
  195. if is_pop:
  196. count = count if count <= self.sget_count(table) else self.sget_count(table)
  197. if count:
  198. if count > 1:
  199. pipe = self._redis.pipeline()
  200. if not self._is_redis_cluster:
  201. pipe.multi()
  202. while count:
  203. pipe.spop(table)
  204. count -= 1
  205. datas = pipe.execute()
  206. else:
  207. datas.append(self._redis.spop(table))
  208. else:
  209. datas = self._redis.srandmember(table, count)
  210. return datas
  211. def srem(self, table, values):
  212. """
  213. @summary: 移除集合中的指定元素
  214. ---------
  215. @param table:
  216. @param values: 一个或者列表
  217. ---------
  218. @result:
  219. """
  220. if isinstance(values, list):
  221. pipe = self._redis.pipeline()
  222. if not self._is_redis_cluster:
  223. pipe.multi()
  224. for value in values:
  225. pipe.srem(table, value)
  226. pipe.execute()
  227. else:
  228. self._redis.srem(table, values)
  229. def sget_count(self, table):
  230. return self._redis.scard(table)
  231. def sdelete(self, table):
  232. """
  233. @summary: 删除set集合的大键(数据量大的表)
  234. 删除大set键,使用sscan命令,每次扫描集合中500个元素,再用srem命令每次删除一个键
  235. 若直接用delete命令,会导致Redis阻塞,出现故障切换和应用程序崩溃的故障。
  236. ---------
  237. @param table:
  238. ---------
  239. @result:
  240. """
  241. # 当 SCAN 命令的游标参数被设置为 0 时, 服务器将开始一次新的迭代, 而当服务器向用户返回值为 0 的游标时, 表示迭代已结束
  242. cursor = "0"
  243. while cursor != 0:
  244. cursor, data = self._redis.sscan(table, cursor=cursor, count=500)
  245. for item in data:
  246. # pipe.srem(table, item)
  247. self._redis.srem(table, item)
  248. # pipe.execute()
  249. def sismember(self, table, key):
  250. "Return a boolean indicating if ``value`` is a member of set ``name``"
  251. return self._redis.sismember(table, key)
  252. def zadd(self, table, values, prioritys=0):
  253. """
  254. @summary: 使用有序set集合存储数据, 去重(值存在更新)
  255. ---------
  256. @param table:
  257. @param values: 值; 支持list 或 单个值
  258. @param prioritys: 优先级; double类型,支持list 或 单个值。 根据此字段的值来排序, 值越小越优先。 可不传值,默认value的优先级为0
  259. ---------
  260. @result:若库中存在 返回0,否则入库,返回1。 批量添加返回 [0, 1 ...]
  261. """
  262. if isinstance(values, list):
  263. if not isinstance(prioritys, list):
  264. prioritys = [prioritys] * len(values)
  265. else:
  266. assert len(values) == len(prioritys), "values值要与prioritys值一一对应"
  267. pipe = self._redis.pipeline()
  268. if not self._is_redis_cluster:
  269. pipe.multi()
  270. for value, priority in zip(values, prioritys):
  271. pipe.execute_command(
  272. "ZADD", table, priority, value
  273. ) # 为了兼容2.x与3.x版本的redis
  274. return pipe.execute()
  275. else:
  276. return self._redis.execute_command(
  277. "ZADD", table, prioritys, values
  278. ) # 为了兼容2.x与3.x版本的redis
  279. def zget(self, table, count=1, is_pop=True):
  280. """
  281. @summary: 从有序set集合中获取数据 优先返回分数小的(优先级高的)
  282. ---------
  283. @param table:
  284. @param count: 数量 -1 返回全部数据
  285. @param is_pop:获取数据后,是否在原set集合中删除,默认是
  286. ---------
  287. @result: 列表
  288. """
  289. start_pos = 0 # 包含
  290. end_pos = count - 1 if count > 0 else count
  291. pipe = self._redis.pipeline()
  292. if not self._is_redis_cluster:
  293. pipe.multi() # 标记事务的开始 参考 http://www.runoob.com/redis/redis-transactions.html
  294. pipe.zrange(table, start_pos, end_pos) # 取值
  295. if is_pop:
  296. pipe.zremrangebyrank(table, start_pos, end_pos) # 删除
  297. results, *count = pipe.execute()
  298. return results
  299. def zremrangebyscore(self, table, priority_min, priority_max):
  300. """
  301. 根据分数移除成员 闭区间
  302. @param table:
  303. @param priority_min:
  304. @param priority_max:
  305. @return: 被移除的成员个数
  306. """
  307. return self._redis.zremrangebyscore(table, priority_min, priority_max)
  308. def zrangebyscore(self, table, priority_min, priority_max, count=None, is_pop=True):
  309. """
  310. @summary: 返回指定分数区间的数据 闭区间
  311. ---------
  312. @param table:
  313. @param priority_min: 优先级越小越优先
  314. @param priority_max:
  315. @param count: 获取的数量,为空则表示分数区间内的全部数据
  316. @param is_pop: 是否删除
  317. ---------
  318. @result:
  319. """
  320. # 使用lua脚本, 保证操作的原子性
  321. lua = """
  322. -- local key = KEYS[1]
  323. local min_score = ARGV[2]
  324. local max_score = ARGV[3]
  325. local is_pop = ARGV[4]
  326. local count = ARGV[5]
  327. -- 取值
  328. local datas = nil
  329. if count then
  330. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'limit', 0, count)
  331. else
  332. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score)
  333. end
  334. -- 删除redis中刚取到的值
  335. if (is_pop=='True' or is_pop=='1') then
  336. for i=1, #datas do
  337. redis.call('zrem', KEYS[1], datas[i])
  338. end
  339. end
  340. return datas
  341. """
  342. cmd = self._redis.register_script(lua)
  343. if count:
  344. res = cmd(
  345. keys=[table], args=[table, priority_min, priority_max, is_pop, count]
  346. )
  347. else:
  348. res = cmd(keys=[table], args=[table, priority_min, priority_max, is_pop])
  349. return res
  350. def zrangebyscore_increase_score(
  351. self, table, priority_min, priority_max, increase_score, count=None
  352. ):
  353. """
  354. @summary: 返回指定分数区间的数据 闭区间, 同时修改分数
  355. ---------
  356. @param table:
  357. @param priority_min: 最小分数
  358. @param priority_max: 最大分数
  359. @param increase_score: 分数值增量 正数则在原有的分数上叠加,负数则相减
  360. @param count: 获取的数量,为空则表示分数区间内的全部数据
  361. ---------
  362. @result:
  363. """
  364. # 使用lua脚本, 保证操作的原子性
  365. lua = """
  366. -- local key = KEYS[1]
  367. local min_score = ARGV[1]
  368. local max_score = ARGV[2]
  369. local increase_score = ARGV[3]
  370. local count = ARGV[4]
  371. -- 取值
  372. local datas = nil
  373. if count then
  374. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'limit', 0, count)
  375. else
  376. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score)
  377. end
  378. --修改优先级
  379. for i=1, #datas do
  380. redis.call('zincrby', KEYS[1], increase_score, datas[i])
  381. end
  382. return datas
  383. """
  384. cmd = self._redis.register_script(lua)
  385. if count:
  386. res = cmd(
  387. keys=[table], args=[priority_min, priority_max, increase_score, count]
  388. )
  389. else:
  390. res = cmd(keys=[table], args=[priority_min, priority_max, increase_score])
  391. return res
  392. def zrangebyscore_set_score(
  393. self, table, priority_min, priority_max, score, count=None
  394. ):
  395. """
  396. @summary: 返回指定分数区间的数据 闭区间, 同时修改分数
  397. ---------
  398. @param table:
  399. @param priority_min: 最小分数
  400. @param priority_max: 最大分数
  401. @param score: 分数值
  402. @param count: 获取的数量,为空则表示分数区间内的全部数据
  403. ---------
  404. @result:
  405. """
  406. # 使用lua脚本, 保证操作的原子性
  407. lua = """
  408. -- local key = KEYS[1]
  409. local min_score = ARGV[1]
  410. local max_score = ARGV[2]
  411. local set_score = ARGV[3]
  412. local count = ARGV[4]
  413. -- 取值
  414. local datas = nil
  415. if count then
  416. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores','limit', 0, count)
  417. else
  418. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores')
  419. end
  420. local real_datas = {} -- 数据
  421. --修改优先级
  422. for i=1, #datas, 2 do
  423. local data = datas[i]
  424. local score = datas[i+1]
  425. table.insert(real_datas, data) -- 添加数据
  426. redis.call('zincrby', KEYS[1], set_score - score, datas[i])
  427. end
  428. return real_datas
  429. """
  430. cmd = self._redis.register_script(lua)
  431. if count:
  432. res = cmd(keys=[table], args=[priority_min, priority_max, score, count])
  433. else:
  434. res = cmd(keys=[table], args=[priority_min, priority_max, score])
  435. return res
  436. def zincrby(self, table, amount, value):
  437. return self._redis.zincrby(table, amount, value)
  438. def zget_count(self, table, priority_min=None, priority_max=None):
  439. """
  440. @summary: 获取表数据的数量
  441. ---------
  442. @param table:
  443. @param priority_min:优先级范围 最小值(包含)
  444. @param priority_max:优先级范围 最大值(包含)
  445. ---------
  446. @result:
  447. """
  448. if priority_min != None and priority_max != None:
  449. return self._redis.zcount(table, priority_min, priority_max)
  450. else:
  451. return self._redis.zcard(table)
  452. def zrem(self, table, values):
  453. """
  454. @summary: 移除集合中的指定元素
  455. ---------
  456. @param table:
  457. @param values: 一个或者列表
  458. ---------
  459. @result:
  460. """
  461. if isinstance(values, list):
  462. self._redis.zrem(table, *values)
  463. else:
  464. self._redis.zrem(table, values)
  465. def zexists(self, table, values):
  466. """
  467. 利用zscore判断某元素是否存在
  468. @param values:
  469. @return:
  470. """
  471. is_exists = []
  472. if isinstance(values, list):
  473. pipe = self._redis.pipeline()
  474. pipe.multi()
  475. for value in values:
  476. pipe.zscore(table, value)
  477. is_exists_temp = pipe.execute()
  478. for is_exist in is_exists_temp:
  479. if is_exist != None:
  480. is_exists.append(1)
  481. else:
  482. is_exists.append(0)
  483. else:
  484. is_exists = self._redis.zscore(table, values)
  485. is_exists = 1 if is_exists != None else 0
  486. return is_exists
  487. def lpush(self, table, values):
  488. if isinstance(values, list):
  489. pipe = self._redis.pipeline()
  490. if not self._is_redis_cluster:
  491. pipe.multi()
  492. for value in values:
  493. pipe.rpush(table, value)
  494. pipe.execute()
  495. else:
  496. return self._redis.rpush(table, values)
  497. def lpop(self, table, count=1):
  498. """
  499. @summary:
  500. ---------
  501. @param table:
  502. @param count:
  503. ---------
  504. @result: count>1时返回列表
  505. """
  506. datas = None
  507. count = count if count <= self.lget_count(table) else self.lget_count(table)
  508. if count:
  509. if count > 1:
  510. pipe = self._redis.pipeline()
  511. if not self._is_redis_cluster:
  512. pipe.multi()
  513. while count:
  514. pipe.lpop(table)
  515. count -= 1
  516. datas = pipe.execute()
  517. else:
  518. datas = self._redis.lpop(table)
  519. return datas
  520. def rpoplpush(self, from_table, to_table=None):
  521. """
  522. 将列表 from_table 中的最后一个元素(尾元素)弹出,并返回给客户端。
  523. 将 from_table 弹出的元素插入到列表 to_table ,作为 to_table 列表的的头元素。
  524. 如果 from_table 和 to_table 相同,则列表中的表尾元素被移动到表头,并返回该元素,可以把这种特殊情况视作列表的旋转(rotation)操作
  525. @param from_table:
  526. @param to_table:
  527. @return:
  528. """
  529. if not to_table:
  530. to_table = from_table
  531. return self._redis.rpoplpush(from_table, to_table)
  532. def lget_count(self, table):
  533. return self._redis.llen(table)
  534. def lrem(self, table, value, num=0):
  535. """
  536. @summary:
  537. 删除value
  538. ---------
  539. @param table:
  540. @param value:
  541. @param num:
  542. ---------
  543. @result: 删除的条数
  544. """
  545. return self._redis.lrem(table, num, value)
  546. def lrange(self, table, start=0, end=-1):
  547. return self._redis.lrange(table, start, end)
  548. def hset(self, table, key, value):
  549. """
  550. @summary:
  551. 如果 key 不存在,一个新的哈希表被创建并进行 HSET 操作。
  552. 如果域 field 已经存在于哈希表中,旧值将被覆盖
  553. ---------
  554. @param table:
  555. @param key:
  556. @param value:
  557. ---------
  558. @result: 1 新插入; 0 覆盖
  559. """
  560. return self._redis.hset(table, key, value)
  561. def hset_batch(self, table, datas):
  562. """
  563. 批量插入
  564. Args:
  565. datas:
  566. [[key, value]]
  567. Returns:
  568. """
  569. pipe = self._redis.pipeline()
  570. if not self._is_redis_cluster:
  571. pipe.multi()
  572. for key, value in datas:
  573. pipe.hset(table, key, value)
  574. return pipe.execute()
  575. def hincrby(self, table, key, increment):
  576. return self._redis.hincrby(table, key, increment)
  577. def hget(self, table, key, is_pop=False):
  578. if not is_pop:
  579. return self._redis.hget(table, key)
  580. else:
  581. lua = """
  582. -- local key = KEYS[1]
  583. local field = ARGV[1]
  584. -- 取值
  585. local datas = redis.call('hget', KEYS[1], field)
  586. -- 删除值
  587. redis.call('hdel', KEYS[1], field)
  588. return datas
  589. """
  590. cmd = self._redis.register_script(lua)
  591. res = cmd(keys=[table], args=[key])
  592. return res
  593. def hgetall(self, table):
  594. return self._redis.hgetall(table)
  595. def hexists(self, table, key):
  596. return self._redis.hexists(table, key)
  597. def hdel(self, table, *keys):
  598. """
  599. @summary: 删除对应的key 可传多个
  600. ---------
  601. @param table:
  602. @param *keys:
  603. ---------
  604. @result:
  605. """
  606. self._redis.hdel(table, *keys)
  607. def hget_count(self, table):
  608. return self._redis.hlen(table)
  609. def setbit(self, table, offsets, values):
  610. """
  611. 设置字符串数组某一位的值, 返回之前的值
  612. @param table:
  613. @param offsets: 支持列表或单个值
  614. @param values: 支持列表或单个值
  615. @return: list / 单个值
  616. """
  617. if isinstance(offsets, list):
  618. if not isinstance(values, list):
  619. values = [values] * len(offsets)
  620. else:
  621. assert len(offsets) == len(values), "offsets值要与values值一一对应"
  622. pipe = self._redis.pipeline()
  623. pipe.multi()
  624. for offset, value in zip(offsets, values):
  625. pipe.setbit(table, offset, value)
  626. return pipe.execute()
  627. else:
  628. return self._redis.setbit(table, offsets, values)
  629. def getbit(self, table, offsets):
  630. """
  631. 取字符串数组某一位的值
  632. @param table:
  633. @param offsets: 支持列表
  634. @return: list / 单个值
  635. """
  636. if isinstance(offsets, list):
  637. pipe = self._redis.pipeline()
  638. pipe.multi()
  639. for offset in offsets:
  640. pipe.getbit(table, offset)
  641. return pipe.execute()
  642. else:
  643. return self._redis.getbit(table, offsets)
  644. def bitcount(self, table):
  645. return self._redis.bitcount(table)
  646. def strset(self, table, value, **kwargs):
  647. return self._redis.set(table, value, **kwargs)
  648. def str_incrby(self, table, value):
  649. return self._redis.incrby(table, value)
  650. def strget(self, table):
  651. return self._redis.get(table)
  652. def strlen(self, table):
  653. return self._redis.strlen(table)
  654. def getkeys(self, regex):
  655. return self._redis.keys(regex)
  656. def exists_key(self, key):
  657. return self._redis.exists(key)
  658. def set_expire(self, key, seconds):
  659. """
  660. @summary: 设置过期时间
  661. ---------
  662. @param key:
  663. @param seconds: 秒
  664. ---------
  665. @result:
  666. """
  667. self._redis.expire(key, seconds)
  668. def get_expire(self, key):
  669. """
  670. @summary: 查询过期时间
  671. ---------
  672. @param key:
  673. @param seconds: 秒
  674. ---------
  675. @result:
  676. """
  677. return self._redis.ttl(key)
  678. def clear(self, table):
  679. try:
  680. self._redis.delete(table)
  681. except Exception as e:
  682. logger.error(e)
  683. def get_redis_obj(self):
  684. return self._redis
  685. def _reconnect(self):
  686. # 检测连接状态, 当数据库重启或设置 timeout 导致断开连接时自动重连
  687. retry_count = 0
  688. while True:
  689. try:
  690. retry_count += 1
  691. logger.error(f"redis 连接断开, 重新连接 {retry_count}")
  692. if self.get_connect():
  693. logger.info(f"redis 连接成功")
  694. return True
  695. except (ConnectionError, TimeoutError) as e:
  696. logger.error(f"连接失败 e: {e}")
  697. time.sleep(2)
  698. def __getattr__(self, name):
  699. return getattr(self._redis, name)