redisdb.py 26 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2016-11-16 16:25
  4. ---------
  5. @summary: 操作redis数据库
  6. ---------
  7. @author: Boris
  8. """
  9. import time
  10. import redis
  11. from redis._compat import unicode, long, basestring
  12. from redis.connection import Encoder as _Encoder
  13. from redis.exceptions import ConnectionError, TimeoutError
  14. from redis.exceptions import DataError
  15. from redis.sentinel import Sentinel
  16. from rediscluster import RedisCluster
  17. import feapder.setting as setting
  18. from feapder.utils.log import log
  19. class Encoder(_Encoder):
  20. def encode(self, value):
  21. "Return a bytestring or bytes-like representation of the value"
  22. if isinstance(value, (bytes, memoryview)):
  23. return value
  24. # elif isinstance(value, bool):
  25. # # special case bool since it is a subclass of int
  26. # raise DataError(
  27. # "Invalid input of type: 'bool'. Convert to a "
  28. # "bytes, string, int or float first."
  29. # )
  30. elif isinstance(value, float):
  31. value = repr(value).encode()
  32. elif isinstance(value, (int, long)):
  33. # python 2 repr() on longs is '123L', so use str() instead
  34. value = str(value).encode()
  35. elif isinstance(value, (list, dict, tuple)):
  36. value = unicode(value)
  37. elif not isinstance(value, basestring):
  38. # a value we don't know how to deal with. throw an error
  39. typename = type(value).__name__
  40. raise DataError(
  41. "Invalid input of type: '%s'. Convert to a "
  42. "bytes, string, int or float first." % typename
  43. )
  44. if isinstance(value, unicode):
  45. value = value.encode(self.encoding, self.encoding_errors)
  46. return value
  47. redis.connection.Encoder = Encoder
  48. class RedisDB:
  49. def __init__(
  50. self,
  51. ip_ports=None,
  52. db=None,
  53. user_pass=None,
  54. url=None,
  55. decode_responses=True,
  56. service_name=None,
  57. max_connections=32,
  58. **kwargs,
  59. ):
  60. """
  61. redis的封装
  62. Args:
  63. ip_ports: ip:port 多个可写为列表或者逗号隔开 如 ip1:port1,ip2:port2 或 ["ip1:port1", "ip2:port2"]
  64. db:
  65. user_pass:
  66. url:
  67. decode_responses:
  68. service_name: 适用于redis哨兵模式
  69. """
  70. # 可能会改setting中的值,所以此处不能直接赋值为默认值,需要后加载赋值
  71. if ip_ports is None:
  72. ip_ports = setting.REDISDB_IP_PORTS
  73. if db is None:
  74. db = setting.REDISDB_DB
  75. if user_pass is None:
  76. user_pass = setting.REDISDB_USER_PASS
  77. if service_name is None:
  78. service_name = setting.REDISDB_SERVICE_NAME
  79. self._is_redis_cluster = False
  80. self.__redis = None
  81. self._url = url
  82. self._ip_ports = ip_ports
  83. self._db = db
  84. self._user_pass = user_pass
  85. self._decode_responses = decode_responses
  86. self._service_name = service_name
  87. self._max_connections = max_connections
  88. self._kwargs = kwargs
  89. self.get_connect()
  90. def __repr__(self):
  91. if self._url:
  92. return "<Redisdb url:{}>".format(self._url)
  93. return "<Redisdb ip_ports: {} db:{} user_pass:{}>".format(
  94. self._ip_ports, self._db, self._user_pass
  95. )
  96. @property
  97. def _redis(self):
  98. try:
  99. if not self.__redis.ping():
  100. raise ConnectionError("unable to connect to redis")
  101. except:
  102. self._reconnect()
  103. return self.__redis
  104. @_redis.setter
  105. def _redis(self, val):
  106. self.__redis = val
  107. def get_connect(self):
  108. # 获取数据库连接
  109. try:
  110. if not self._url:
  111. if not self._ip_ports:
  112. raise Exception("未设置 redis 连接信息")
  113. ip_ports = (
  114. self._ip_ports
  115. if isinstance(self._ip_ports, list)
  116. else self._ip_ports.split(",")
  117. )
  118. if len(ip_ports) > 1:
  119. startup_nodes = []
  120. for ip_port in ip_ports:
  121. ip, port = ip_port.split(":")
  122. startup_nodes.append({"host": ip, "port": port})
  123. if self._service_name:
  124. # log.debug("使用redis哨兵模式")
  125. hosts = [(node["host"], node["port"]) for node in startup_nodes]
  126. sentinel = Sentinel(hosts, socket_timeout=3, **self._kwargs)
  127. self._redis = sentinel.master_for(
  128. self._service_name,
  129. password=self._user_pass,
  130. db=self._db,
  131. redis_class=redis.StrictRedis,
  132. decode_responses=self._decode_responses,
  133. max_connections=self._max_connections,
  134. **self._kwargs,
  135. )
  136. else:
  137. # log.debug("使用redis集群模式")
  138. self._redis = RedisCluster(
  139. startup_nodes=startup_nodes,
  140. decode_responses=self._decode_responses,
  141. password=self._user_pass,
  142. max_connections=self._max_connections,
  143. **self._kwargs,
  144. )
  145. self._is_redis_cluster = True
  146. else:
  147. ip, port = ip_ports[0].split(":")
  148. self._redis = redis.StrictRedis(
  149. host=ip,
  150. port=port,
  151. db=self._db,
  152. password=self._user_pass,
  153. decode_responses=self._decode_responses,
  154. max_connections=self._max_connections,
  155. **self._kwargs,
  156. )
  157. self._is_redis_cluster = False
  158. else:
  159. self._redis = redis.StrictRedis.from_url(
  160. self._url, decode_responses=self._decode_responses
  161. )
  162. self._is_redis_cluster = False
  163. except Exception as e:
  164. raise
  165. # 不要写成self._redis.ping() 否则循环调用了
  166. return self.__redis.ping()
  167. @classmethod
  168. def from_url(cls, url):
  169. """
  170. Args:
  171. url: redis://[[username]:[password]]@[host]:[port]/[db]
  172. Returns:
  173. """
  174. return cls(url=url)
  175. def sadd(self, table, values):
  176. """
  177. @summary: 使用无序set集合存储数据, 去重
  178. ---------
  179. @param table:
  180. @param values: 值; 支持list 或 单个值
  181. ---------
  182. @result: 若库中存在 返回0,否则入库,返回1。 批量添加返回None
  183. """
  184. if isinstance(values, list):
  185. pipe = self._redis.pipeline()
  186. if not self._is_redis_cluster:
  187. pipe.multi()
  188. for value in values:
  189. pipe.sadd(table, value)
  190. pipe.execute()
  191. else:
  192. return self._redis.sadd(table, values)
  193. def sget(self, table, count=1, is_pop=True):
  194. """
  195. 返回 list 如 ['1'] 或 []
  196. @param table:
  197. @param count:
  198. @param is_pop:
  199. @return:
  200. """
  201. datas = []
  202. if is_pop:
  203. count = count if count <= self.sget_count(table) else self.sget_count(table)
  204. if count:
  205. if count > 1:
  206. pipe = self._redis.pipeline()
  207. if not self._is_redis_cluster:
  208. pipe.multi()
  209. while count:
  210. pipe.spop(table)
  211. count -= 1
  212. datas = pipe.execute()
  213. else:
  214. datas.append(self._redis.spop(table))
  215. else:
  216. datas = self._redis.srandmember(table, count)
  217. return datas
  218. def srem(self, table, values):
  219. """
  220. @summary: 移除集合中的指定元素
  221. ---------
  222. @param table:
  223. @param values: 一个或者列表
  224. ---------
  225. @result:
  226. """
  227. if isinstance(values, list):
  228. pipe = self._redis.pipeline()
  229. if not self._is_redis_cluster:
  230. pipe.multi()
  231. for value in values:
  232. pipe.srem(table, value)
  233. pipe.execute()
  234. else:
  235. self._redis.srem(table, values)
  236. def sget_count(self, table):
  237. return self._redis.scard(table)
  238. def sdelete(self, table):
  239. """
  240. @summary: 删除set集合的大键(数据量大的表)
  241. 删除大set键,使用sscan命令,每次扫描集合中500个元素,再用srem命令每次删除一个键
  242. 若直接用delete命令,会导致Redis阻塞,出现故障切换和应用程序崩溃的故障。
  243. ---------
  244. @param table:
  245. ---------
  246. @result:
  247. """
  248. # 当 SCAN 命令的游标参数被设置为 0 时, 服务器将开始一次新的迭代, 而当服务器向用户返回值为 0 的游标时, 表示迭代已结束
  249. cursor = "0"
  250. while cursor != 0:
  251. cursor, data = self._redis.sscan(table, cursor=cursor, count=500)
  252. for item in data:
  253. # pipe.srem(table, item)
  254. self._redis.srem(table, item)
  255. # pipe.execute()
  256. def sismember(self, table, key):
  257. "Return a boolean indicating if ``value`` is a member of set ``name``"
  258. return self._redis.sismember(table, key)
  259. def zadd(self, table, values, prioritys=0):
  260. """
  261. @summary: 使用有序set集合存储数据, 去重(值存在更新)
  262. ---------
  263. @param table:
  264. @param values: 值; 支持list 或 单个值
  265. @param prioritys: 优先级; double类型,支持list 或 单个值。 根据此字段的值来排序, 值越小越优先。 可不传值,默认value的优先级为0
  266. ---------
  267. @result:若库中存在 返回0,否则入库,返回1。 批量添加返回 [0, 1 ...]
  268. """
  269. if isinstance(values, list):
  270. if not isinstance(prioritys, list):
  271. prioritys = [prioritys] * len(values)
  272. else:
  273. assert len(values) == len(prioritys), "values值要与prioritys值一一对应"
  274. pipe = self._redis.pipeline()
  275. if not self._is_redis_cluster:
  276. pipe.multi()
  277. for value, priority in zip(values, prioritys):
  278. pipe.execute_command(
  279. "ZADD", table, priority, value
  280. ) # 为了兼容2.x与3.x版本的redis
  281. return pipe.execute()
  282. else:
  283. return self._redis.execute_command(
  284. "ZADD", table, prioritys, values
  285. ) # 为了兼容2.x与3.x版本的redis
  286. def zget(self, table, count=1, is_pop=True):
  287. """
  288. @summary: 从有序set集合中获取数据 优先返回分数小的(优先级高的)
  289. ---------
  290. @param table:
  291. @param count: 数量 -1 返回全部数据
  292. @param is_pop:获取数据后,是否在原set集合中删除,默认是
  293. ---------
  294. @result: 列表
  295. """
  296. start_pos = 0 # 包含
  297. end_pos = count - 1 if count > 0 else count
  298. pipe = self._redis.pipeline()
  299. if not self._is_redis_cluster:
  300. pipe.multi() # 标记事务的开始 参考 http://www.runoob.com/redis/redis-transactions.html
  301. pipe.zrange(table, start_pos, end_pos) # 取值
  302. if is_pop:
  303. pipe.zremrangebyrank(table, start_pos, end_pos) # 删除
  304. results, *count = pipe.execute()
  305. return results
  306. def zremrangebyscore(self, table, priority_min, priority_max):
  307. """
  308. 根据分数移除成员 闭区间
  309. @param table:
  310. @param priority_min:
  311. @param priority_max:
  312. @return: 被移除的成员个数
  313. """
  314. return self._redis.zremrangebyscore(table, priority_min, priority_max)
  315. def zrangebyscore(self, table, priority_min, priority_max, count=None, is_pop=True):
  316. """
  317. @summary: 返回指定分数区间的数据 闭区间
  318. ---------
  319. @param table:
  320. @param priority_min: 优先级越小越优先
  321. @param priority_max:
  322. @param count: 获取的数量,为空则表示分数区间内的全部数据
  323. @param is_pop: 是否删除
  324. ---------
  325. @result:
  326. """
  327. # 使用lua脚本, 保证操作的原子性
  328. lua = """
  329. -- local key = KEYS[1]
  330. local min_score = ARGV[2]
  331. local max_score = ARGV[3]
  332. local is_pop = ARGV[4]
  333. local count = ARGV[5]
  334. -- 取值
  335. local datas = nil
  336. if count then
  337. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'limit', 0, count)
  338. else
  339. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score)
  340. end
  341. -- 删除redis中刚取到的值
  342. if (is_pop=='True' or is_pop=='1') then
  343. for i=1, #datas do
  344. redis.call('zrem', KEYS[1], datas[i])
  345. end
  346. end
  347. return datas
  348. """
  349. cmd = self._redis.register_script(lua)
  350. if count:
  351. res = cmd(
  352. keys=[table], args=[table, priority_min, priority_max, is_pop, count]
  353. )
  354. else:
  355. res = cmd(keys=[table], args=[table, priority_min, priority_max, is_pop])
  356. return res
  357. def zrangebyscore_increase_score(
  358. self, table, priority_min, priority_max, increase_score, count=None
  359. ):
  360. """
  361. @summary: 返回指定分数区间的数据 闭区间, 同时修改分数
  362. ---------
  363. @param table:
  364. @param priority_min: 最小分数
  365. @param priority_max: 最大分数
  366. @param increase_score: 分数值增量 正数则在原有的分数上叠加,负数则相减
  367. @param count: 获取的数量,为空则表示分数区间内的全部数据
  368. ---------
  369. @result:
  370. """
  371. # 使用lua脚本, 保证操作的原子性
  372. lua = """
  373. -- local key = KEYS[1]
  374. local min_score = ARGV[1]
  375. local max_score = ARGV[2]
  376. local increase_score = ARGV[3]
  377. local count = ARGV[4]
  378. -- 取值
  379. local datas = nil
  380. if count then
  381. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'limit', 0, count)
  382. else
  383. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score)
  384. end
  385. --修改优先级
  386. for i=1, #datas do
  387. redis.call('zincrby', KEYS[1], increase_score, datas[i])
  388. end
  389. return datas
  390. """
  391. cmd = self._redis.register_script(lua)
  392. if count:
  393. res = cmd(
  394. keys=[table], args=[priority_min, priority_max, increase_score, count]
  395. )
  396. else:
  397. res = cmd(keys=[table], args=[priority_min, priority_max, increase_score])
  398. return res
  399. def zrangebyscore_set_score(
  400. self, table, priority_min, priority_max, score, count=None
  401. ):
  402. """
  403. @summary: 返回指定分数区间的数据 闭区间, 同时修改分数
  404. ---------
  405. @param table:
  406. @param priority_min: 最小分数
  407. @param priority_max: 最大分数
  408. @param score: 分数值
  409. @param count: 获取的数量,为空则表示分数区间内的全部数据
  410. ---------
  411. @result:
  412. """
  413. # 使用lua脚本, 保证操作的原子性
  414. lua = """
  415. -- local key = KEYS[1]
  416. local min_score = ARGV[1]
  417. local max_score = ARGV[2]
  418. local set_score = ARGV[3]
  419. local count = ARGV[4]
  420. -- 取值
  421. local datas = nil
  422. if count then
  423. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores','limit', 0, count)
  424. else
  425. datas = redis.call('zrangebyscore', KEYS[1], min_score, max_score, 'withscores')
  426. end
  427. local real_datas = {} -- 数据
  428. --修改优先级
  429. for i=1, #datas, 2 do
  430. local data = datas[i]
  431. local score = datas[i+1]
  432. table.insert(real_datas, data) -- 添加数据
  433. redis.call('zincrby', KEYS[1], set_score - score, datas[i])
  434. end
  435. return real_datas
  436. """
  437. cmd = self._redis.register_script(lua)
  438. if count:
  439. res = cmd(keys=[table], args=[priority_min, priority_max, score, count])
  440. else:
  441. res = cmd(keys=[table], args=[priority_min, priority_max, score])
  442. return res
  443. def zincrby(self, table, amount, value):
  444. return self._redis.zincrby(table, amount, value)
  445. def zget_count(self, table, priority_min=None, priority_max=None):
  446. """
  447. @summary: 获取表数据的数量
  448. ---------
  449. @param table:
  450. @param priority_min:优先级范围 最小值(包含)
  451. @param priority_max:优先级范围 最大值(包含)
  452. ---------
  453. @result:
  454. """
  455. if priority_min != None and priority_max != None:
  456. return self._redis.zcount(table, priority_min, priority_max)
  457. else:
  458. return self._redis.zcard(table)
  459. def zrem(self, table, values):
  460. """
  461. @summary: 移除集合中的指定元素
  462. ---------
  463. @param table:
  464. @param values: 一个或者列表
  465. ---------
  466. @result:
  467. """
  468. if isinstance(values, list):
  469. self._redis.zrem(table, *values)
  470. else:
  471. self._redis.zrem(table, values)
  472. def zexists(self, table, values):
  473. """
  474. 利用zscore判断某元素是否存在
  475. @param values:
  476. @return:
  477. """
  478. is_exists = []
  479. if isinstance(values, list):
  480. pipe = self._redis.pipeline()
  481. pipe.multi()
  482. for value in values:
  483. pipe.zscore(table, value)
  484. is_exists_temp = pipe.execute()
  485. for is_exist in is_exists_temp:
  486. if is_exist != None:
  487. is_exists.append(1)
  488. else:
  489. is_exists.append(0)
  490. else:
  491. is_exists = self._redis.zscore(table, values)
  492. is_exists = 1 if is_exists != None else 0
  493. return is_exists
  494. def lpush(self, table, values):
  495. if isinstance(values, list):
  496. pipe = self._redis.pipeline()
  497. if not self._is_redis_cluster:
  498. pipe.multi()
  499. for value in values:
  500. pipe.rpush(table, value)
  501. pipe.execute()
  502. else:
  503. return self._redis.rpush(table, values)
  504. def lpop(self, table, count=1):
  505. """
  506. @summary:
  507. ---------
  508. @param table:
  509. @param count:
  510. ---------
  511. @result: count>1时返回列表
  512. """
  513. datas = None
  514. count = count if count <= self.lget_count(table) else self.lget_count(table)
  515. if count:
  516. if count > 1:
  517. pipe = self._redis.pipeline()
  518. if not self._is_redis_cluster:
  519. pipe.multi()
  520. while count:
  521. pipe.lpop(table)
  522. count -= 1
  523. datas = pipe.execute()
  524. else:
  525. datas = self._redis.lpop(table)
  526. return datas
  527. def rpoplpush(self, from_table, to_table=None):
  528. """
  529. 将列表 from_table 中的最后一个元素(尾元素)弹出,并返回给客户端。
  530. 将 from_table 弹出的元素插入到列表 to_table ,作为 to_table 列表的的头元素。
  531. 如果 from_table 和 to_table 相同,则列表中的表尾元素被移动到表头,并返回该元素,可以把这种特殊情况视作列表的旋转(rotation)操作
  532. @param from_table:
  533. @param to_table:
  534. @return:
  535. """
  536. if not to_table:
  537. to_table = from_table
  538. return self._redis.rpoplpush(from_table, to_table)
  539. def lget_count(self, table):
  540. return self._redis.llen(table)
  541. def lrem(self, table, value, num=0):
  542. """
  543. @summary:
  544. 删除value
  545. ---------
  546. @param table:
  547. @param value:
  548. @param num:
  549. ---------
  550. @result: 删除的条数
  551. """
  552. return self._redis.lrem(table, num, value)
  553. def lrange(self, table, start=0, end=-1):
  554. return self._redis.lrange(table, start, end)
  555. def hset(self, table, key, value):
  556. """
  557. @summary:
  558. 如果 key 不存在,一个新的哈希表被创建并进行 HSET 操作。
  559. 如果域 field 已经存在于哈希表中,旧值将被覆盖
  560. ---------
  561. @param table:
  562. @param key:
  563. @param value:
  564. ---------
  565. @result: 1 新插入; 0 覆盖
  566. """
  567. return self._redis.hset(table, key, value)
  568. def hset_batch(self, table, datas):
  569. """
  570. 批量插入
  571. Args:
  572. datas:
  573. [[key, value]]
  574. Returns:
  575. """
  576. pipe = self._redis.pipeline()
  577. if not self._is_redis_cluster:
  578. pipe.multi()
  579. for key, value in datas:
  580. pipe.hset(table, key, value)
  581. return pipe.execute()
  582. def hincrby(self, table, key, increment):
  583. return self._redis.hincrby(table, key, increment)
  584. def hget(self, table, key, is_pop=False):
  585. if not is_pop:
  586. return self._redis.hget(table, key)
  587. else:
  588. lua = """
  589. -- local key = KEYS[1]
  590. local field = ARGV[1]
  591. -- 取值
  592. local datas = redis.call('hget', KEYS[1], field)
  593. -- 删除值
  594. redis.call('hdel', KEYS[1], field)
  595. return datas
  596. """
  597. cmd = self._redis.register_script(lua)
  598. res = cmd(keys=[table], args=[key])
  599. return res
  600. def hgetall(self, table):
  601. return self._redis.hgetall(table)
  602. def hexists(self, table, key):
  603. return self._redis.hexists(table, key)
  604. def hdel(self, table, *keys):
  605. """
  606. @summary: 删除对应的key 可传多个
  607. ---------
  608. @param table:
  609. @param *keys:
  610. ---------
  611. @result:
  612. """
  613. self._redis.hdel(table, *keys)
  614. def hget_count(self, table):
  615. return self._redis.hlen(table)
  616. def setbit(self, table, offsets, values):
  617. """
  618. 设置字符串数组某一位的值, 返回之前的值
  619. @param table:
  620. @param offsets: 支持列表或单个值
  621. @param values: 支持列表或单个值
  622. @return: list / 单个值
  623. """
  624. if isinstance(offsets, list):
  625. if not isinstance(values, list):
  626. values = [values] * len(offsets)
  627. else:
  628. assert len(offsets) == len(values), "offsets值要与values值一一对应"
  629. pipe = self._redis.pipeline()
  630. pipe.multi()
  631. for offset, value in zip(offsets, values):
  632. pipe.setbit(table, offset, value)
  633. return pipe.execute()
  634. else:
  635. return self._redis.setbit(table, offsets, values)
  636. def getbit(self, table, offsets):
  637. """
  638. 取字符串数组某一位的值
  639. @param table:
  640. @param offsets: 支持列表
  641. @return: list / 单个值
  642. """
  643. if isinstance(offsets, list):
  644. pipe = self._redis.pipeline()
  645. pipe.multi()
  646. for offset in offsets:
  647. pipe.getbit(table, offset)
  648. return pipe.execute()
  649. else:
  650. return self._redis.getbit(table, offsets)
  651. def bitcount(self, table):
  652. return self._redis.bitcount(table)
  653. def strset(self, table, value, **kwargs):
  654. return self._redis.set(table, value, **kwargs)
  655. def str_incrby(self, table, value):
  656. return self._redis.incrby(table, value)
  657. def strget(self, table):
  658. return self._redis.get(table)
  659. def strlen(self, table):
  660. return self._redis.strlen(table)
  661. def getkeys(self, regex):
  662. return self._redis.keys(regex)
  663. def exists_key(self, key):
  664. return self._redis.exists(key)
  665. def set_expire(self, key, seconds):
  666. """
  667. @summary: 设置过期时间
  668. ---------
  669. @param key:
  670. @param seconds: 秒
  671. ---------
  672. @result:
  673. """
  674. self._redis.expire(key, seconds)
  675. def get_expire(self, key):
  676. """
  677. @summary: 查询过期时间
  678. ---------
  679. @param key:
  680. @param seconds: 秒
  681. ---------
  682. @result:
  683. """
  684. return self._redis.ttl(key)
  685. def clear(self, table):
  686. try:
  687. self._redis.delete(table)
  688. except Exception as e:
  689. log.error(e)
  690. def get_redis_obj(self):
  691. return self._redis
  692. def _reconnect(self):
  693. # 检测连接状态, 当数据库重启或设置 timeout 导致断开连接时自动重连
  694. retry_count = 0
  695. while True:
  696. try:
  697. retry_count += 1
  698. log.error(f"redis 连接断开, 重新连接 {retry_count}")
  699. if self.get_connect():
  700. log.info(f"redis 连接成功")
  701. return True
  702. except (ConnectionError, TimeoutError) as e:
  703. log.error(f"连接失败 e: {e}")
  704. time.sleep(2)
  705. def __getattr__(self, name):
  706. return getattr(self._redis, name)