base_server.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-05-09
  4. ---------
  5. @summary: 管理服务基类
  6. ---------
  7. @author: Dzr
  8. """
  9. import common.utils as tools
  10. from common.databases import mongo_table, redis_client
  11. class BaseServer:
  12. def __init__(self, server: str, label: str, db=None, table=None, **kwargs):
  13. self.server = server
  14. if db and table:
  15. self.mgo_db_name = db
  16. self.mgo_table_name = table
  17. self.mgo_db = mongo_table(db, table)
  18. self.task_fingerprint = f"{label}_task_fingerprint"
  19. self.request_tasks = f"{label}_task_list"
  20. self.redis_db = redis_client(cfg=kwargs.pop('redis_cfg', None))
  21. self._unique_key = ('_id',)
  22. @property
  23. def unique_key(self):
  24. return self._unique_key
  25. @unique_key.setter
  26. def unique_key(self, keys):
  27. if isinstance(keys, (tuple, list)):
  28. self._unique_key = keys
  29. else:
  30. self._unique_key = (keys,)
  31. def fingerprint(self, **kwargs):
  32. args = []
  33. for key, value in kwargs.items():
  34. if value:
  35. if (self.unique_key and key in self.unique_key) or not self.unique_key:
  36. args.append(str(value))
  37. if args:
  38. args = sorted(args)
  39. return tools.get_md5(*args)
  40. def exists(self, fingerprint):
  41. """判断元素是否存在redis集合"""
  42. return self.redis_db.hexists(self.task_fingerprint, fingerprint)
  43. def add(self, fingerprint, default=""):
  44. """添加元素到redis集合"""
  45. self.redis_db.hset(self.task_fingerprint, fingerprint, default)
  46. def remove(self, fingerprint):
  47. """从redis集合中移除元素"""
  48. self.redis_db.hdel(self.task_fingerprint, fingerprint)
  49. def rpush(self, task: str):
  50. self.redis_db.rpush(self.request_tasks, task)
  51. def lpop(self):
  52. return self.redis_db.lpop(self.request_tasks)
  53. @property
  54. def task_total(self):
  55. return self.redis_db.llen(self.request_tasks)
  56. def upload_data_to_mongodb(self, data, bulk_size=100, **kwargs):
  57. db = kwargs.pop('db', None) or self.mgo_db_name
  58. table = kwargs.pop('table', None) or self.mgo_table_name
  59. inserted_count = 0
  60. caches = []
  61. documents = data if isinstance(data, list) else [data]
  62. for document in documents:
  63. caches.append(document)
  64. inserted_count += 1
  65. if len(caches) % bulk_size == 0:
  66. mongo_table(db, table).insert_many(caches)
  67. caches.clear()
  68. if len(caches) > 0:
  69. mongo_table(db, table).insert_many(caches)
  70. return inserted_count