123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-05-09
- ---------
- @summary: 管理服务基类
- ---------
- @author: Dzr
- """
- import common.utils as tools
- from common.databases import mongo_table
- from common.redisdb import RedisDB
- class BaseServer:
- def __init__(self, server: str, label: str, db=None, table=None, **kwargs):
- self.server = server
- if db and table:
- self.mgo_db_name = db
- self.mgo_table_name = table
- self.mgo_db = mongo_table(db, table)
- self.task_fingerprint = f"{label}_task_fingerprint"
- self.request_tasks = f"{label}_task_list"
- self.redis_db = RedisDB(kwargs.pop('redis_cfg', None))
- self._unique_key = ('_id',)
- @property
- def unique_key(self):
- return self._unique_key
- @unique_key.setter
- def unique_key(self, keys):
- if isinstance(keys, (tuple, list)):
- self._unique_key = keys
- else:
- self._unique_key = (keys,)
- def fingerprint(self, **kwargs):
- args = []
- for key, value in kwargs.items():
- if value:
- if (self.unique_key and key in self.unique_key) or not self.unique_key:
- args.append(str(value))
- if args:
- args = sorted(args)
- return tools.get_md5(*args)
- def exists(self, fingerprint):
- """判断元素是否存在redis集合"""
- return self.redis_db.hexists(self.task_fingerprint, fingerprint)
- def add(self, fingerprint, default=""):
- """添加元素到redis集合"""
- self.redis_db.hset(self.task_fingerprint, fingerprint, default)
- def remove(self, fingerprint):
- """从redis集合中移除元素"""
- self.redis_db.hdel(self.task_fingerprint, fingerprint)
- def rpush(self, task: str):
- self.redis_db.get_redis_obj().rpush(self.request_tasks, task)
- def lpop(self):
- return self.redis_db.lpop(self.request_tasks)
- @property
- def task_total(self):
- return self.redis_db.get_redis_obj().llen(self.request_tasks)
- def upload_data_to_mongodb(self, data, bulk_size=100, **kwargs):
- db = kwargs.pop('db', None) or self.mgo_db_name
- table = kwargs.pop('table', None) or self.mgo_table_name
- inserted_count = 0
- caches = []
- documents = data if isinstance(data, list) else [data]
- for document in documents:
- caches.append(document)
- inserted_count += 1
- if len(caches) % bulk_size == 0:
- mongo_table(db, table).insert_many(caches)
- caches.clear()
- if len(caches) > 0:
- mongo_table(db, table).insert_many(caches)
- return inserted_count
|