redisdb.py 30 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2016-11-16 16:25
  4. ---------
  5. @summary: 操作redis数据库
  6. ---------
  7. @author: Boris
  8. """
  9. import os
  10. import time
  11. from typing import Union, List
  12. import redis
  13. from redis.connection import Encoder as _Encoder
  14. from redis.exceptions import ConnectionError, TimeoutError
  15. from redis.exceptions import DataError
  16. from redis.sentinel import Sentinel
  17. import setting
  18. from common.log import logger as log
  19. class Encoder(_Encoder):
  20. def encode(self, value):
  21. "Return a bytestring or bytes-like representation of the value"
  22. if isinstance(value, (bytes, memoryview)):
  23. return value
  24. # elif isinstance(value, bool):
  25. # # special case bool since it is a subclass of int
  26. # raise DataError(
  27. # "Invalid input of type: 'bool'. Convert to a "
  28. # "bytes, string, int or float first."
  29. # )
  30. elif isinstance(value, float):
  31. value = repr(value).encode()
  32. elif isinstance(value, int):
  33. # python 2 repr() on longs is '123L', so use str() instead
  34. value = str(value).encode()
  35. elif isinstance(value, (list, dict, tuple)):
  36. value = str(value)
  37. elif not isinstance(value, str):
  38. # a value we don't know how to deal with. throw an error
  39. typename = type(value).__name__
  40. raise DataError(
  41. "Invalid input of type: '%s'. Convert to a "
  42. "bytes, string, int or float first." % typename
  43. )
  44. if isinstance(value, str):
  45. value = value.encode(self.encoding, self.encoding_errors)
  46. return value
  47. redis.connection.Encoder = Encoder
  48. class RedisDB:
  49. def __init__(
  50. self,
  51. ip_ports=None,
  52. db=None,
  53. user_pass=None,
  54. url=None,
  55. decode_responses=True,
  56. service_name=None,
  57. max_connections=1000,
  58. **kwargs,
  59. ):
  60. """
  61. redis的封装
  62. Args:
  63. ip_ports: ip:port 多个可写为列表或者逗号隔开 如 ip1:port1,ip2:port2 或 ["ip1:port1", "ip2:port2"]
  64. db:
  65. user_pass:
  66. url:
  67. decode_responses:
  68. service_name: 适用于redis哨兵模式
  69. max_connections: 同一个redis对象使用的并发数(连接池的最大连接数),超过这个数量会抛出redis.ConnectionError
  70. """
  71. # 可能会改setting中的值,所以此处不能直接赋值为默认值,需要后加载赋值
  72. if ip_ports is None:
  73. ip = setting.redis_conf['host']
  74. port = setting.redis_conf['port']
  75. ip_ports = f'{ip}:{port}'
  76. if db is None:
  77. db = setting.redis_conf['db']
  78. if user_pass is None:
  79. user_pass = setting.redis_conf['pwd']
  80. if service_name is None:
  81. service_name = setting.redis_conf['service_name']
  82. if kwargs is None:
  83. kwargs = setting.redis_conf['redis_kwargs']
  84. self._is_redis_cluster = False
  85. self.__redis = None
  86. self._url = url
  87. self._ip_ports = ip_ports
  88. self._db = db
  89. self._user_pass = user_pass
  90. self._decode_responses = decode_responses
  91. self._service_name = service_name
  92. self._max_connections = max_connections
  93. self._kwargs = kwargs
  94. self.get_connect()
  95. def __repr__(self):
  96. if self._url:
  97. return "<Redisdb url:{}>".format(self._url)
  98. return "<Redisdb ip_ports: {} db:{} user_pass:{}>".format(
  99. self._ip_ports, self._db, self._user_pass
  100. )
  101. @property
  102. def _redis(self):
  103. try:
  104. if not self.__redis.ping():
  105. raise ConnectionError("unable to connect to redis")
  106. except:
  107. self._reconnect()
  108. return self.__redis
  109. @_redis.setter
  110. def _redis(self, val):
  111. self.__redis = val
  112. def get_connect(self):
  113. # 获取数据库连接
  114. try:
  115. if not self._url:
  116. if not self._ip_ports:
  117. raise ConnectionError("未设置 redis 连接信息")
  118. ip_ports = (
  119. self._ip_ports
  120. if isinstance(self._ip_ports, list)
  121. else self._ip_ports.split(",")
  122. )
  123. if len(ip_ports) > 1:
  124. startup_nodes = []
  125. for ip_port in ip_ports:
  126. ip, port = ip_port.split(":")
  127. startup_nodes.append({"host": ip, "port": port})
  128. if self._service_name:
  129. # log.debug("使用redis哨兵模式")
  130. hosts = [(node["host"], node["port"]) for node in startup_nodes]
  131. sentinel = Sentinel(hosts, socket_timeout=3, **self._kwargs)
  132. self._redis = sentinel.master_for(
  133. self._service_name,
  134. password=self._user_pass,
  135. db=self._db,
  136. redis_class=redis.StrictRedis,
  137. decode_responses=self._decode_responses,
  138. max_connections=self._max_connections,
  139. **self._kwargs,
  140. )
  141. else:
  142. try:
  143. from rediscluster import RedisCluster
  144. except ModuleNotFoundError as e:
  145. log.error('请安装 pip install "feapder[all]"')
  146. os._exit(0)
  147. # log.debug("使用redis集群模式")
  148. self._redis = RedisCluster(
  149. startup_nodes=startup_nodes,
  150. decode_responses=self._decode_responses,
  151. password=self._user_pass,
  152. max_connections=self._max_connections,
  153. **self._kwargs,
  154. )
  155. self._is_redis_cluster = True
  156. else:
  157. ip, port = ip_ports[0].split(":")
  158. self._redis = redis.StrictRedis(
  159. host=ip,
  160. port=port,
  161. db=self._db,
  162. password=self._user_pass,
  163. decode_responses=self._decode_responses,
  164. max_connections=self._max_connections,
  165. **self._kwargs,
  166. )
  167. self._is_redis_cluster = False
  168. else:
  169. self._redis = redis.StrictRedis.from_url(
  170. self._url, decode_responses=self._decode_responses, **self._kwargs
  171. )
  172. self._is_redis_cluster = False
  173. except Exception as e:
  174. raise e
  175. # 不要写成self._redis.ping() 否则循环调用了
  176. return self.__redis.ping()
  177. @classmethod
  178. def from_url(cls, url):
  179. """
  180. Args:
  181. url: redis://[[username]:[password]]@[host]:[port]/[db]
  182. Returns:
  183. """
  184. return cls(url=url)
  185. def sadd(self, table, values):
  186. """
  187. @summary: 使用无序set集合存储数据, 去重
  188. ---------
  189. @param table:
  190. @param values: 值; 支持list 或 单个值
  191. ---------
  192. @result: 若库中存在 返回0,否则入库,返回1。 批量添加返回None
  193. """
  194. if isinstance(values, list):
  195. pipe = self._redis.pipeline()
  196. if not self._is_redis_cluster:
  197. pipe.multi()
  198. for value in values:
  199. pipe.sadd(table, value)
  200. pipe.execute()
  201. else:
  202. return self._redis.sadd(table, values)
  203. def sget(self, table, count=1, is_pop=True):
  204. """
  205. 返回 list 如 ['1'] 或 []
  206. @param table:
  207. @param count:
  208. @param is_pop:
  209. @return:
  210. """
  211. datas = []
  212. if is_pop:
  213. count = count if count <= self.sget_count(table) else self.sget_count(table)
  214. if count:
  215. if count > 1:
  216. pipe = self._redis.pipeline()
  217. if not self._is_redis_cluster:
  218. pipe.multi()
  219. while count:
  220. pipe.spop(table)
  221. count -= 1
  222. datas = pipe.execute()
  223. else:
  224. datas.append(self._redis.spop(table))
  225. else:
  226. datas = self._redis.srandmember(table, count)
  227. return datas
  228. def srem(self, table, values):
  229. """
  230. @summary: 移除集合中的指定元素
  231. ---------
  232. @param table:
  233. @param values: 一个或者列表
  234. ---------
  235. @result:
  236. """
  237. if isinstance(values, list):
  238. pipe = self._redis.pipeline()
  239. if not self._is_redis_cluster:
  240. pipe.multi()
  241. for value in values:
  242. pipe.srem(table, value)
  243. pipe.execute()
  244. else:
  245. self._redis.srem(table, values)
  246. def sget_count(self, table):
  247. return self._redis.scard(table)
  248. def sdelete(self, table):
  249. """
  250. @summary: 删除set集合的大键(数据量大的表)
  251. 删除大set键,使用sscan命令,每次扫描集合中500个元素,再用srem命令每次删除一个键
  252. 若直接用delete命令,会导致Redis阻塞,出现故障切换和应用程序崩溃的故障。
  253. ---------
  254. @param table:
  255. ---------
  256. @result:
  257. """
  258. # 当 SCAN 命令的游标参数被设置为 0 时, 服务器将开始一次新的迭代, 而当服务器向用户返回值为 0 的游标时, 表示迭代已结束
  259. cursor = "0"
  260. while cursor != 0:
  261. cursor, data = self._redis.sscan(table, cursor=cursor, count=500)
  262. for item in data:
  263. # pipe.srem(table, item)
  264. self._redis.srem(table, item)
  265. # pipe.execute()
  266. def sismember(self, table, key):
  267. "Return a boolean indicating if ``value`` is a member of set ``name``"
  268. return self._redis.sismember(table, key)
  269. def zadd(self, table, values, prioritys=0):
  270. """
  271. @summary: 使用有序set集合存储数据, 去重(值存在更新)
  272. ---------
  273. @param table:
  274. @param values: 值; 支持list 或 单个值
  275. @param prioritys: 优先级; double类型,支持list 或 单个值。 根据此字段的值来排序, 值越小越优先。 可不传值,默认value的优先级为0
  276. ---------
  277. @result:若库中存在 返回0,否则入库,返回1。 批量添加返回 [0, 1 ...]
  278. """
  279. if isinstance(values, list):
  280. if not isinstance(prioritys, list):
  281. prioritys = [prioritys] * len(values)
  282. else:
  283. assert len(values) == len(prioritys), "values值要与prioritys值一一对应"
  284. pipe = self._redis.pipeline()
  285. if not self._is_redis_cluster:
  286. pipe.multi()
  287. for value, priority in zip(values, prioritys):
  288. pipe.execute_command(
  289. "ZADD", table, priority, value
  290. ) # 为了兼容2.x与3.x版本的redis
  291. return pipe.execute()
  292. else:
  293. return self._redis.execute_command(
  294. "ZADD", table, prioritys, values
  295. ) # 为了兼容2.x与3.x版本的redis
  296. def zget(self, table, count=1, is_pop=True):
  297. """
  298. @summary: 从有序set集合中获取数据 优先返回分数小的(优先级高的)
  299. ---------
  300. @param table:
  301. @param count: 数量 -1 返回全部数据
  302. @param is_pop:获取数据后,是否在原set集合中删除,默认是
  303. ---------
  304. @result: 列表
  305. """
  306. start_pos = 0 # 包含
  307. end_pos = count - 1 if count > 0 else count
  308. pipe = self._redis.pipeline()
  309. if not self._is_redis_cluster:
  310. pipe.multi() # 标记事务的开始 参考 http://www.runoob.com/redis/redis-transactions.html
  311. pipe.zrange(table, start_pos, end_pos) # 取值
  312. if is_pop:
  313. pipe.zremrangebyrank(table, start_pos, end_pos) # 删除
  314. results, *count = pipe.execute()
  315. return results
  316. def zremrangebyscore(self, table, priority_min, priority_max):
  317. """
  318. 根据分数移除成员 闭区间
  319. @param table:
  320. @param priority_min:
  321. @param priority_max:
  322. @return: 被移除的成员个数
  323. """
  324. return self._redis.zremrangebyscore(table, priority_min, priority_max)
  325. def zrangebyscore(self, table, priority_min, priority_max, count=None, is_pop=True):
  326. """
  327. @summary: 返回指定分数区间的数据 闭区间
  328. ---------
  329. @param table:
  330. @param priority_min: 优先级越小越优先
  331. @param priority_max:
  332. @param count: 获取的数量,为空则表示分数区间内的全部数据
  333. @param is_pop: 是否删除
  334. ---------
  335. @result:
  336. """
  337. # 使用lua脚本, 保证操作的原子性
  338. lua = """
  339. -- local key = KEYS[1]
  340. local min_score = ARGV[2]
  341. local max_score = ARGV[3]
  342. local is_pop = ARGV[4]
  343. local count = ARGV[5]
  344. -- 取值
  345. local datas = nil
  346. if count then
  347. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'limit', 0, count)
  348. else
  349. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score)
  350. end
  351. -- 删除redis中刚取到的值
  352. if (is_pop=='True' or is_pop=='1') then
  353. for i=1, #datas do
  354. redis.call('zrem', KEYS[1], datas[i])
  355. end
  356. end
  357. return datas
  358. """
  359. cmd = self._redis.register_script(lua)
  360. if count:
  361. res = cmd(
  362. keys=[table], args=[table, priority_min, priority_max, is_pop, count]
  363. )
  364. else:
  365. res = cmd(keys=[table], args=[table, priority_min, priority_max, is_pop])
  366. return res
  367. def zrangebyscore_increase_score(
  368. self, table, priority_min, priority_max, increase_score, count=None
  369. ):
  370. """
  371. @summary: 返回指定分数区间的数据 闭区间, 同时修改分数
  372. ---------
  373. @param table:
  374. @param priority_min: 最小分数
  375. @param priority_max: 最大分数
  376. @param increase_score: 分数值增量 正数则在原有的分数上叠加,负数则相减
  377. @param count: 获取的数量,为空则表示分数区间内的全部数据
  378. ---------
  379. @result:
  380. """
  381. # 使用lua脚本, 保证操作的原子性
  382. lua = """
  383. -- local key = KEYS[1]
  384. local min_score = ARGV[1]
  385. local max_score = ARGV[2]
  386. local increase_score = ARGV[3]
  387. local count = ARGV[4]
  388. -- 取值
  389. local datas = nil
  390. if count then
  391. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'limit', 0, count)
  392. else
  393. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score)
  394. end
  395. --修改优先级
  396. for i=1, #datas do
  397. redis.call('zincrby', KEYS[1], increase_score, datas[i])
  398. end
  399. return datas
  400. """
  401. cmd = self._redis.register_script(lua)
  402. if count:
  403. res = cmd(
  404. keys=[table], args=[priority_min, priority_max, increase_score, count]
  405. )
  406. else:
  407. res = cmd(keys=[table], args=[priority_min, priority_max, increase_score])
  408. return res
  409. def zrangebyscore_set_score(
  410. self, table, priority_min, priority_max, score, count=None
  411. ):
  412. """
  413. @summary: 返回指定分数区间的数据 闭区间, 同时修改分数
  414. ---------
  415. @param table:
  416. @param priority_min: 最小分数
  417. @param priority_max: 最大分数
  418. @param score: 分数值
  419. @param count: 获取的数量,为空则表示分数区间内的全部数据
  420. ---------
  421. @result:
  422. """
  423. # 使用lua脚本, 保证操作的原子性
  424. lua = """
  425. -- local key = KEYS[1]
  426. local min_score = ARGV[1]
  427. local max_score = ARGV[2]
  428. local set_score = ARGV[3]
  429. local count = ARGV[4]
  430. -- 取值
  431. local datas = nil
  432. if count then
  433. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores','limit', 0, count)
  434. else
  435. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores')
  436. end
  437. local real_datas = {} -- 数据
  438. --修改优先级
  439. for i=1, #datas, 2 do
  440. local data = datas[i]
  441. local score = datas[i+1]
  442. table.insert(real_datas, data) -- 添加数据
  443. redis.call('zincrby', KEYS[1], set_score - score, datas[i])
  444. end
  445. return real_datas
  446. """
  447. cmd = self._redis.register_script(lua)
  448. if count:
  449. res = cmd(keys=[table], args=[priority_min, priority_max, score, count])
  450. else:
  451. res = cmd(keys=[table], args=[priority_min, priority_max, score])
  452. return res
  453. def zincrby(self, table, amount, value):
  454. return self._redis.zincrby(table, amount, value)
  455. def zget_count(self, table, priority_min=None, priority_max=None):
  456. """
  457. @summary: 获取表数据的数量
  458. ---------
  459. @param table:
  460. @param priority_min:优先级范围 最小值(包含)
  461. @param priority_max:优先级范围 最大值(包含)
  462. ---------
  463. @result:
  464. """
  465. if priority_min != None and priority_max != None:
  466. return self._redis.zcount(table, priority_min, priority_max)
  467. else:
  468. return self._redis.zcard(table)
  469. def zrem(self, table, values):
  470. """
  471. @summary: 移除集合中的指定元素
  472. ---------
  473. @param table:
  474. @param values: 一个或者列表
  475. ---------
  476. @result:
  477. """
  478. if isinstance(values, list):
  479. self._redis.zrem(table, *values)
  480. else:
  481. self._redis.zrem(table, values)
  482. def zexists(self, table, values):
  483. """
  484. 利用zscore判断某元素是否存在
  485. @param values:
  486. @return:
  487. """
  488. is_exists = []
  489. if isinstance(values, list):
  490. pipe = self._redis.pipeline()
  491. pipe.multi()
  492. for value in values:
  493. pipe.zscore(table, value)
  494. is_exists_temp = pipe.execute()
  495. for is_exist in is_exists_temp:
  496. if is_exist != None:
  497. is_exists.append(1)
  498. else:
  499. is_exists.append(0)
  500. else:
  501. is_exists = self._redis.zscore(table, values)
  502. is_exists = 1 if is_exists != None else 0
  503. return is_exists
  504. def lpush(self, table, values):
  505. if isinstance(values, list):
  506. pipe = self._redis.pipeline()
  507. if not self._is_redis_cluster:
  508. pipe.multi()
  509. for value in values:
  510. pipe.lpush(table, value)
  511. pipe.execute()
  512. else:
  513. return self._redis.lpush(table, values)
  514. def lpop(self, table, count=1):
  515. """
  516. @summary:
  517. ---------
  518. @param table:
  519. @param count:
  520. ---------
  521. @result: count>1时返回列表
  522. """
  523. datas = None
  524. lcount = self.lget_count(table)
  525. count = count if count <= lcount else lcount
  526. if count:
  527. if count > 1:
  528. pipe = self._redis.pipeline()
  529. if not self._is_redis_cluster:
  530. pipe.multi()
  531. while count:
  532. pipe.lpop(table)
  533. count -= 1
  534. datas = pipe.execute()
  535. else:
  536. datas = self._redis.lpop(table)
  537. return datas
  538. def rpoplpush(self, from_table, to_table=None):
  539. """
  540. 将列表 from_table 中的最后一个元素(尾元素)弹出,并返回给客户端。
  541. 将 from_table 弹出的元素插入到列表 to_table ,作为 to_table 列表的的头元素。
  542. 如果 from_table 和 to_table 相同,则列表中的表尾元素被移动到表头,并返回该元素,可以把这种特殊情况视作列表的旋转(rotation)操作
  543. @param from_table:
  544. @param to_table:
  545. @return:
  546. """
  547. if not to_table:
  548. to_table = from_table
  549. return self._redis.rpoplpush(from_table, to_table)
  550. def lget_count(self, table):
  551. return self._redis.llen(table)
  552. def lrem(self, table, value, num=0):
  553. """
  554. @summary:
  555. 删除value
  556. ---------
  557. @param table:
  558. @param value:
  559. @param num:
  560. ---------
  561. @result: 删除的条数
  562. """
  563. return self._redis.lrem(table, num, value)
  564. def lrange(self, table, start=0, end=-1):
  565. return self._redis.lrange(table, start, end)
  566. def hset(self, table, key, value):
  567. """
  568. @summary:
  569. 如果 key 不存在,一个新的哈希表被创建并进行 HSET 操作。
  570. 如果域 field 已经存在于哈希表中,旧值将被覆盖
  571. ---------
  572. @param table:
  573. @param key:
  574. @param value:
  575. ---------
  576. @result: 1 新插入; 0 覆盖
  577. """
  578. return self._redis.hset(table, key, value)
  579. def hset_batch(self, table, datas):
  580. """
  581. 批量插入
  582. Args:
  583. datas:
  584. [[key, value]]
  585. Returns:
  586. """
  587. pipe = self._redis.pipeline()
  588. if not self._is_redis_cluster:
  589. pipe.multi()
  590. for key, value in datas:
  591. pipe.hset(table, key, value)
  592. return pipe.execute()
  593. def hincrby(self, table, key, increment):
  594. return self._redis.hincrby(table, key, increment)
  595. def hget(self, table, key, is_pop=False):
  596. if not is_pop:
  597. return self._redis.hget(table, key)
  598. else:
  599. lua = """
  600. -- local key = KEYS[1]
  601. local field = ARGV[1]
  602. -- 取值
  603. local datas = redis.call('hget', KEYS[1], field)
  604. -- 删除值
  605. redis.call('hdel', KEYS[1], field)
  606. return datas
  607. """
  608. cmd = self._redis.register_script(lua)
  609. res = cmd(keys=[table], args=[key])
  610. return res
  611. def hgetall(self, table):
  612. return self._redis.hgetall(table)
  613. def hexists(self, table, key):
  614. return self._redis.hexists(table, key)
  615. def hdel(self, table, *keys):
  616. """
  617. @summary: 删除对应的key 可传多个
  618. ---------
  619. @param table:
  620. @param *keys:
  621. ---------
  622. @result:
  623. """
  624. self._redis.hdel(table, *keys)
  625. def hget_count(self, table):
  626. return self._redis.hlen(table)
  627. def hkeys(self, table):
  628. return self._redis.hkeys(table)
  629. def hvals(self, key):
  630. return self._redis.hvals(key)
  631. def setbit(
  632. self, table, offsets: Union[int, List[int]], values: Union[int, List[int]]
  633. ):
  634. """
  635. 设置字符串数组某一位的值,返回之前的值
  636. @param table: Redis key
  637. @param offsets: 支持列表或单个值
  638. @param values: 支持列表或单个值
  639. @return: list / 单个值
  640. """
  641. if isinstance(offsets, list):
  642. if isinstance(values, int):
  643. # 使用lua脚本,数据是一起传给redis的,降低了网络开销,但redis会阻塞
  644. script = """
  645. local value = table.remove(ARGV, 1)
  646. local offsets = ARGV
  647. local results = {}
  648. for i, offset in ipairs(offsets) do
  649. results[i] = redis.call('SETBIT', KEYS[1], offset, value)
  650. end
  651. return results
  652. """
  653. return self._redis.eval(script, 1, table, values, *offsets)
  654. else:
  655. assert len(offsets) == len(values), "offsets值要与values值一一对应"
  656. pipe = self._redis.pipeline()
  657. pipe.multi()
  658. for offset, value in zip(offsets, values):
  659. pipe.setbit(table, offset, value)
  660. return pipe.execute()
  661. else:
  662. return self._redis.setbit(table, offsets, values)
  663. def getbit(self, table, offsets):
  664. """
  665. 取字符串数组某一位的值
  666. @param table:
  667. @param offsets: 支持列表
  668. @return: list / 单个值
  669. """
  670. if isinstance(offsets, list):
  671. pipe = self._redis.pipeline()
  672. pipe.multi()
  673. for offset in offsets:
  674. pipe.getbit(table, offset)
  675. return pipe.execute()
  676. else:
  677. return self._redis.getbit(table, offsets)
  678. def bitcount(self, table):
  679. return self._redis.bitcount(table)
  680. def strset(self, table, value, **kwargs):
  681. """
  682. 设置键值
  683. Args:
  684. table:
  685. value:
  686. **kwargs:
  687. ex: Union[None, int, timedelta] = ..., 设置键的过期时间为 second 秒
  688. px: Union[None, int, timedelta] = ..., 设置键的过期时间为 millisecond 毫秒
  689. nx: bool = ..., 只有键不存在时,才对键进行设置操作
  690. xx: bool = ..., 只有键已经存在时,才对键进行设置操作
  691. keepttl: bool = ..., 保留键的过期时间
  692. Returns:
  693. """
  694. return self._redis.set(table, value, **kwargs)
  695. def str_incrby(self, table, value):
  696. return self._redis.incrby(table, value)
  697. def strget(self, table):
  698. return self._redis.get(table)
  699. def strlen(self, table):
  700. return self._redis.strlen(table)
  701. def str_getkeys(self, regex, count=500):
  702. cursor = '0'
  703. while True:
  704. cursor, keys = self._redis.scan(cursor, match=regex, count=count, _type='STRING')
  705. if len(keys) > 0:
  706. yield from keys
  707. if cursor == 0:
  708. break
  709. def str_exists(self, regex, count=500):
  710. """
  711. @summary: 大量键模糊判重
  712. 大量键模糊检索,使用scan命令,每次扫描集合中500个元素,
  713. 若直接用KEYS命令,会导致Redis阻塞,或者返回大量的键,出现数据安全问题等。
  714. ---------
  715. @param regex: redis模糊查询规则
  716. @param count: 每轮扫描条数,默认:500
  717. ---------
  718. @result:
  719. """
  720. cursor = '0'
  721. while True:
  722. cursor, keys = self._redis.scan(cursor, match=regex, count=count, _type='STRING')
  723. if len(keys) > 0:
  724. return True
  725. if cursor == 0:
  726. break
  727. return False
  728. def str_delete(self, regex=None, datas=None, count=500):
  729. """
  730. @summary: 删除大量键
  731. 删除大量键,使用scan命令,每次扫描集合中500个元素,再用unlink命令异步批量删除键
  732. 若直接用delete命令,会导致Redis阻塞,出现故障切换和应用程序崩溃的故障。
  733. ---------
  734. @param regex: redis模糊查询规则
  735. @param count: 每轮扫描条数,默认:500
  736. @param datas: 直接使用unlink命令异步删除数据集合
  737. ---------
  738. @result:
  739. """
  740. if datas is not None:
  741. if isinstance(datas, list) and len(datas) > 0:
  742. self._redis.unlink(*datas)
  743. else:
  744. # 当 SCAN 命令的游标参数被设置为 0 时,服务器将开始一次新的迭代,而当服务器向用户返回值为 0 的游标时,表示迭代已结束
  745. batch_size = 500
  746. cursor = '0'
  747. while True:
  748. cursor, keys = self._redis.scan(cursor, match=regex, count=count, _type='STRING')
  749. # print(cursor, len(keys))
  750. if len(keys) > 0:
  751. self._redis.unlink(*keys[:batch_size])
  752. # 如果游标为 0 且仍有键,则再次遍历
  753. if cursor == 0 and len(keys) > 0:
  754. cursor = '0'
  755. continue
  756. # 如果游标为 0 且没有键,则退出循环
  757. if cursor == 0 and len(keys) == 0:
  758. break
  759. def str_count(self, regex, count=500):
  760. length = 0
  761. for _ in self.str_getkeys(regex, count):
  762. length += 1
  763. # cursor = '0'
  764. # while True:
  765. # cursor, keys = self._redis.scan(cursor, match=regex, count=count, _type='STRING')
  766. # if cursor == 0:
  767. # length += len(keys)
  768. # break
  769. #
  770. # length += len(keys)
  771. return length
  772. def getkeys(self, regex):
  773. return self._redis.keys(regex)
  774. def exists_key(self, key):
  775. return self._redis.exists(key)
  776. def set_expire(self, key, seconds):
  777. """
  778. @summary: 设置过期时间
  779. ---------
  780. @param key:
  781. @param seconds: 秒
  782. ---------
  783. @result:
  784. """
  785. self._redis.expire(key, seconds)
  786. def get_expire(self, key):
  787. """
  788. @summary: 查询过期时间
  789. ---------
  790. @param key:
  791. ---------
  792. @result:
  793. """
  794. return self._redis.ttl(key)
  795. def clear(self, table):
  796. try:
  797. self._redis.delete(table)
  798. except Exception as e:
  799. log.error(e)
  800. def get_redis_obj(self):
  801. return self._redis
  802. def _reconnect(self):
  803. """检测连接状态, 当数据库重启或设置 timeout 导致断开连接时自动重连"""
  804. retry_count = 0
  805. while True:
  806. try:
  807. retry_count += 1
  808. log.error(f"redis 连接断开, 重新连接 {retry_count}")
  809. if self.get_connect():
  810. log.info(f"redis 连接成功")
  811. return True
  812. except (ConnectionError, TimeoutError) as e:
  813. log.error(f"连接失败 e: {e}")
  814. time.sleep(2)
  815. def __getattr__(self, name):
  816. return getattr(self._redis, name)