Sfoglia il codice sorgente

乐观锁更新配置

dzr 11 mesi fa
parent
commit
8154544c67
3 ha cambiato i file con 51 aggiunte e 8 eliminazioni
  1. 47 0
      common/redis_lock.py
  2. 2 4
      services/proxy.py
  3. 2 4
      services/site_monitor.py

+ 47 - 0
common/redis_lock.py

@@ -52,3 +52,50 @@ def release_lock(conn, lockname, identifier):
                 pass
 
         return False
+
+
+class OptimisticLock:
+
+    def __init__(self, conn, lockname, acquire_timeout=60, lock_timeout=10):
+        self.conn = conn
+        self.lockname = 'lock:' + lockname
+        self.acquire_timeout = acquire_timeout
+        self.lock_timeout = int(math.ceil(lock_timeout))  # 确保传给 expire 是整数
+        self.identifier = None
+
+    def __enter__(self):
+        # 128位随机标识符
+        self.identifier = str(uuid.uuid4())
+
+        end = time.time() + self.acquire_timeout
+        while time.time() < end:
+            if self.conn.setnx(self.lockname, self.identifier):
+                self.conn.expire(self.lockname, self.lock_timeout)
+                return self
+
+            elif not self.conn.ttl(self.lockname):  # 为没有设置超时时间的锁设置超时时间
+                self.conn.expire(self.lockname, self.lock_timeout)
+
+            time.sleep(0.001)
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        with self.conn.pipeline(True) as pipe:
+            while True:
+                try:
+                    pipe.watch(self.lockname)
+                    value = pipe.get(self.lockname)
+                    # 判断标志是否相同
+                    if value is not None and value == self.identifier:
+                        pipe.multi()
+                        pipe.delete(self.lockname)
+                        pipe.execute()
+                        return True
+
+                    # 不同则直接退出 return False
+                    pipe.unwatch()
+                    break
+                except redis.exceptions.WatchError:
+                    pass
+
+            return False

+ 2 - 4
services/proxy.py

@@ -20,7 +20,7 @@ import requests
 import setting as settings
 from base_server import BaseServer, tools
 from common.log import logger
-from common.redis_lock import acquire_lock_with_timeout, release_lock
+from common.redis_lock import OptimisticLock
 
 DEFAULT_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36'
 
@@ -288,8 +288,7 @@ class ProxyPoolClient(BaseProxyPool):
         ]
 
     def proxies(self, **kwargs):
-        lock = acquire_lock_with_timeout(self.redis_db, self.lock_label)
-        if lock:
+        with OptimisticLock(self.redis_db, self.lock_label):
             proxy = {}
             if self.proxy_total > 0:
                 proxy_lst = self.get_all_proxy(**kwargs)
@@ -298,5 +297,4 @@ class ProxyPoolClient(BaseProxyPool):
                 self.redis_db.hset(name, 'usage', proxy['usage'] + 1)
                 self.redis_db.hset(name, 'last_time', tools.now_ts())
 
-            release_lock(self.redis_db, self.lock_label, lock)
             return proxy.get('proxies')

+ 2 - 4
services/site_monitor.py

@@ -16,7 +16,7 @@ from urllib3 import get_host
 import setting as settings
 from base_server import BaseServer, tools, mongo_table
 from common.log import logger
-from common.redis_lock import acquire_lock_with_timeout, release_lock
+from common.redis_lock import OptimisticLock
 
 
 def extract_domain(url):
@@ -203,14 +203,12 @@ class SiteMonitorClient(BaseServer):
         self.lock_label = f'{redis_label}:{current_process.pid}'
 
     def get_crawl_task(self):
-        lock = acquire_lock_with_timeout(self.redis_db, self.lock_label)
-        if lock:
+        with OptimisticLock(self.redis_db, self.lock_label):
             convert_fields = dict(luaconfig_id=tools.ObjectId, _id=tools.ObjectId)
             task = tools.json_loads(self.lpop(), **convert_fields)
             if task is not None:
                 task = tools.document2dict(task)
                 del task['luaconfig_id']
-            release_lock(self.redis_db, self.lock_label, lock)
             return task
 
     def save_data(self, table, documents):