1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-05-09
- ---------
- @summary: 管理服务基类
- ---------
- @author: Dzr
- """
- import common.utils as tools
- from common.databases import mongo_table, redis_client
- class BaseServer:
- def __init__(self, server: str, label: str, db=None, table=None, **kwargs):
- self.server = server
- if db and table:
- self.mgo_db_name = db
- self.mgo_table_name = table
- self.mgo_db = mongo_table(db, table)
- self.task_fingerprint = f"{label}_task_fingerprint"
- self.request_tasks = f"{label}_task_list"
- self.redis_db = redis_client(cfg=kwargs.pop('redis_cfg', None))
- self._unique_key = ('_id',)
- @property
- def unique_key(self):
- return self._unique_key
- @unique_key.setter
- def unique_key(self, keys):
- if isinstance(keys, (tuple, list)):
- self._unique_key = keys
- else:
- self._unique_key = (keys,)
- def fingerprint(self, **kwargs):
- args = []
- for key, value in kwargs.items():
- if value:
- if (self.unique_key and key in self.unique_key) or not self.unique_key:
- args.append(str(value))
- if args:
- args = sorted(args)
- return tools.get_md5(*args)
- def exists(self, fingerprint):
- """判断元素是否存在redis集合"""
- return self.redis_db.hexists(self.task_fingerprint, fingerprint)
- def add(self, fingerprint, default=""):
- """添加元素到redis集合"""
- self.redis_db.hset(self.task_fingerprint, fingerprint, default)
- def remove(self, fingerprint):
- """从redis集合中移除元素"""
- self.redis_db.hdel(self.task_fingerprint, fingerprint)
- def rpush(self, task: str):
- self.redis_db.rpush(self.request_tasks, task)
- def lpop(self):
- return self.redis_db.lpop(self.request_tasks)
- @property
- def task_total(self):
- return self.redis_db.llen(self.request_tasks)
- def upload_data_to_mongodb(self, data, bulk_size=100, **kwargs):
- db = kwargs.pop('db', None) or self.mgo_db_name
- table = kwargs.pop('table', None) or self.mgo_table_name
- inserted_count = 0
- caches = []
- documents = data if isinstance(data, list) else [data]
- for document in documents:
- caches.append(document)
- inserted_count += 1
- if len(caches) % bulk_size == 0:
- mongo_table(db, table).insert_many(caches)
- caches.clear()
- if len(caches) > 0:
- mongo_table(db, table).insert_many(caches)
- return inserted_count
|