redis_lock.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2019/11/5 5:25 PM
  4. ---------
  5. @summary:
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. import threading
  11. import time
  12. from feapder.db.redisdb import RedisDB
  13. from feapder.utils.log import log
  14. class RedisLock:
  15. redis_cli = None
  16. def __init__(self, key, redis_cli=None, wait_timeout=0, lock_timeout=86400):
  17. """
  18. redis超时锁
  19. :param key: 存储锁的key redis_lock:[key]
  20. :param redis_cli: redis客户端对象
  21. :param wait_timeout: 等待加锁超时时间,为0时则不等待加锁,加锁失败
  22. :param lock_timeout: 锁超时时间 为0时则不会超时,直到锁释放或意外退出,默认超时为1天
  23. 用法示例:
  24. with RedisLock(key="test") as _lock:
  25. if _lock.locked:
  26. # 用来判断是否加上了锁
  27. # do somethings
  28. """
  29. self.redis_conn = redis_cli
  30. self.lock_key = "redis_lock:{}".format(key)
  31. # 锁超时时间
  32. self.lock_timeout = lock_timeout
  33. # 等待加锁时间
  34. self.wait_timeout = wait_timeout
  35. self.locked = False
  36. self.stop_prolong_life = False
  37. @property
  38. def redis_conn(self):
  39. if not self.__class__.redis_cli:
  40. self.__class__.redis_cli = RedisDB().get_redis_obj()
  41. return self.__class__.redis_cli
  42. @redis_conn.setter
  43. def redis_conn(self, cli):
  44. self.__class__.redis_cli = cli
  45. def __enter__(self):
  46. if not self.locked:
  47. self.acquire()
  48. # 延长锁的时间
  49. thread = threading.Thread(target=self.prolong_life)
  50. thread.setDaemon(True)
  51. thread.start()
  52. return self
  53. def __exit__(self, exc_type, exc_val, exc_tb):
  54. self.stop_prolong_life = True
  55. self.release()
  56. def __repr__(self):
  57. return "<RedisLock: {} >".format(self.lock_key)
  58. def acquire(self):
  59. start = time.time()
  60. while True:
  61. # 尝试加锁
  62. if self.redis_conn.set(self.lock_key, time.time(), nx=True, ex=5):
  63. self.locked = True
  64. break
  65. if self.wait_timeout > 0:
  66. if time.time() - start > self.wait_timeout:
  67. log.info("加锁失败")
  68. break
  69. else:
  70. break
  71. log.debug("等待加锁: {} wait:{}".format(self, time.time() - start))
  72. if self.wait_timeout > 10:
  73. time.sleep(5)
  74. else:
  75. time.sleep(1)
  76. return
  77. def release(self):
  78. if self.locked:
  79. self.redis_conn.delete(self.lock_key)
  80. self.locked = False
  81. return
  82. def prolong_life(self):
  83. """
  84. 延长锁的过期时间
  85. :return:
  86. """
  87. spend_time = 0
  88. while not self.stop_prolong_life:
  89. expire = self.redis_conn.ttl(self.lock_key)
  90. if expire < 0: # key 不存在
  91. time.sleep(1)
  92. continue
  93. self.redis_conn.expire(self.lock_key, expire + 5) # 延长5秒
  94. time.sleep(expire) # 临过期5秒前,再次延长
  95. spend_time += expire
  96. if self.lock_timeout and spend_time > self.lock_timeout:
  97. log.info("锁超时,释放")
  98. self.redis_conn.delete(self.lock_key)
  99. break