base_server.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-05-09
  4. ---------
  5. @summary: 管理服务基类
  6. ---------
  7. @author: Dzr
  8. """
  9. import common.utils as tools
  10. from common.databases import mongo_table
  11. from common.redisdb import RedisDB
  12. class BaseServer:
  13. def __init__(self, server: str, label: str, db=None, table=None, **kwargs):
  14. self.server = server
  15. if db and table:
  16. self.mgo_db_name = db
  17. self.mgo_table_name = table
  18. self.mgo_db = mongo_table(db, table)
  19. self.task_fingerprint = f"{label}_task_fingerprint"
  20. self.request_tasks = f"{label}_task_list"
  21. self.redis_db = RedisDB(kwargs.pop('redis_cfg', None))
  22. self._unique_key = ('_id',)
  23. @property
  24. def unique_key(self):
  25. return self._unique_key
  26. @unique_key.setter
  27. def unique_key(self, keys):
  28. if isinstance(keys, (tuple, list)):
  29. self._unique_key = keys
  30. else:
  31. self._unique_key = (keys,)
  32. def fingerprint(self, **kwargs):
  33. args = []
  34. for key, value in kwargs.items():
  35. if value:
  36. if (self.unique_key and key in self.unique_key) or not self.unique_key:
  37. args.append(str(value))
  38. if args:
  39. args = sorted(args)
  40. return tools.get_md5(*args)
  41. def exists(self, fingerprint):
  42. """判断元素是否存在redis集合"""
  43. return self.redis_db.hexists(self.task_fingerprint, fingerprint)
  44. def add(self, fingerprint, default=""):
  45. """添加元素到redis集合"""
  46. self.redis_db.hset(self.task_fingerprint, fingerprint, default)
  47. def remove(self, fingerprint):
  48. """从redis集合中移除元素"""
  49. self.redis_db.hdel(self.task_fingerprint, fingerprint)
  50. def rpush(self, task: str):
  51. self.redis_db.get_redis_obj().rpush(self.request_tasks, task)
  52. def lpop(self):
  53. return self.redis_db.lpop(self.request_tasks)
  54. @property
  55. def task_total(self):
  56. return self.redis_db.get_redis_obj().llen(self.request_tasks)
  57. def upload_data_to_mongodb(self, data, bulk_size=100, **kwargs):
  58. db = kwargs.pop('db', None) or self.mgo_db_name
  59. table = kwargs.pop('table', None) or self.mgo_table_name
  60. inserted_count = 0
  61. caches = []
  62. documents = data if isinstance(data, list) else [data]
  63. for document in documents:
  64. caches.append(document)
  65. inserted_count += 1
  66. if len(caches) % bulk_size == 0:
  67. mongo_table(db, table).insert_many(caches)
  68. caches.clear()
  69. if len(caches) > 0:
  70. mongo_table(db, table).insert_many(caches)
  71. return inserted_count