# -*- coding: utf-8 -*- """ Created on 2024-02-27 --------- @summary: redis 去重 --------- @author: Lzz """ import hashlib import redis import setting class RedisFilter: def __init__(self, url=None, expire_time=None): if not url: url = setting.REDIS_URL self.redis_db = redis.StrictRedis.from_url(url) self._ex = expire_time or setting.REDIS_EXPIRE_TIME def __repr__(self): return "".format(self.redis_db) def exists(self, key): """全量检索""" if self.redis_db.exists(key) > 0: return True return False def add(self, keys): """ 添加数据 @param keys: 检查关键词在 redis 中是否存在,支持列表批量 @return: list / 单个值(添加失败返回False, 添加成功返回True) """ is_list = isinstance(keys, list) keys = keys if is_list else [keys] is_added = [] for key in keys: pkey = "pylist_" + self.fingerprint(key) if not self.exists(pkey): is_added.append(self.redis_db.set(pkey, 1, ex=self._ex)) else: is_added.append(False) return is_added if is_list else is_added[0] def delete(self, keys): is_list = isinstance(keys, list) keys = keys if is_list else [keys] for key in keys: pkey = "pylist_" + self.fingerprint(key) self.redis_db.delete(pkey) return True def get(self, keys): """ 检查数据是否存在 @param keys: list / 单个值 @return: list / 单个值 (存在返回True 不存在返回False) """ is_list = isinstance(keys, list) keys = keys if is_list else [keys] is_exist = [] for key in keys: pkey = "pylist_" + self.fingerprint(key) is_exist.append(self.exists(pkey)) # 判断数据本身是否重复 temp_set = set() for i, key in enumerate(keys): if key in temp_set: is_exist[i] = True else: temp_set.add(key) return is_exist if is_list else is_exist[0] def fingerprint(self, *args): """ @summary: 获取唯一的64位值,获取唯一数据指纹 --------- @param args: 去重数据集合 --------- @result: 5580c91ea29bf5bd963f4c08dfcacd983566e44ecea1735102bc380576fd6f30 """ args = sorted(args) sha256 = hashlib.sha256() for arg in args: sha256.update(str(arg).encode()) return sha256.hexdigest()