# -*- coding: utf-8 -*- """ Created on 2023-05-09 --------- @summary: 管理服务基类 --------- @author: Dzr """ import common.utils as tools from common.databases import mongo_table, redis_client class BaseServer: def __init__(self, server: str, label: str, db=None, table=None, **kwargs): self.server = server if db and table: self.mgo_db_name = db self.mgo_table_name = table self.mgo_db = mongo_table(db, table) self.task_fingerprint = f"{label}_task_fingerprint" self.request_tasks = f"{label}_task_list" self.redis_db = redis_client(cfg=kwargs.pop('redis_cfg', None)) self._unique_key = ('_id',) @property def unique_key(self): return self._unique_key @unique_key.setter def unique_key(self, keys): if isinstance(keys, (tuple, list)): self._unique_key = keys else: self._unique_key = (keys,) def fingerprint(self, **kwargs): args = [] for key, value in kwargs.items(): if value: if (self.unique_key and key in self.unique_key) or not self.unique_key: args.append(str(value)) if args: args = sorted(args) return tools.get_md5(*args) def exists(self, fingerprint): """判断元素是否存在redis集合""" return self.redis_db.hexists(self.task_fingerprint, fingerprint) def add(self, fingerprint, default=""): """添加元素到redis集合""" self.redis_db.hset(self.task_fingerprint, fingerprint, default) def remove(self, fingerprint): """从redis集合中移除元素""" self.redis_db.hdel(self.task_fingerprint, fingerprint) def rpush(self, task: str): self.redis_db.rpush(self.request_tasks, task) def lpop(self): return self.redis_db.lpop(self.request_tasks) @property def task_total(self): return self.redis_db.llen(self.request_tasks) def upload_data_to_mongodb(self, data, bulk_size=100, **kwargs): db = kwargs.pop('db', None) or self.mgo_db_name table = kwargs.pop('table', None) or self.mgo_table_name inserted_count = 0 caches = [] documents = data if isinstance(data, list) else [data] for document in documents: caches.append(document) inserted_count += 1 if len(caches) % bulk_size == 0: mongo_table(db, table).insert_many(caches) caches.clear() if len(caches) > 0: mongo_table(db, table).insert_many(caches) return inserted_count