utils.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import datetime
  2. import hashlib
  3. import json
  4. import socket
  5. import time
  6. from datetime import date, timedelta
  7. from json import JSONEncoder
  8. from bson import ObjectId
  9. def delay(wait):
  10. time.sleep(wait)
  11. def json_loads(s, **kwargs):
  12. """
  13. json 文档反序列化为 Python 对象
  14. @param str s: json 字符串
  15. @param kwargs: 字段和指定的特殊类型映射
  16. For example:
  17. > s = {'oid': '123123123'} # 把 oid 的 '123456' 转换成 ObjectId('123456')
  18. > import bson
  19. > result = json_loads(s, dzr_id=bson.ObjectId)
  20. """
  21. if not s:
  22. return None
  23. result = json.loads(s)
  24. for key, impl in kwargs.items():
  25. result[key] = impl(result[key])
  26. return result
  27. def json_dumps(obj: dict):
  28. """Python 对象序列化为 json 文档"""
  29. class JsonEncoder(JSONEncoder):
  30. def default(self, o):
  31. if isinstance(o, ObjectId):
  32. return str(o)
  33. return super().default(o)
  34. return json.dumps(obj, ensure_ascii=False, cls=JsonEncoder)
  35. def document2dict(items: dict):
  36. """mongo 文档转成 python dict"""
  37. return json_loads(json_dumps(items))
  38. def dict2document(items: dict, **kwargs):
  39. """python dict 转成 mongo 文档"""
  40. if len(kwargs) == 0:
  41. kwargs['_id'] = ObjectId
  42. return json_loads(json_dumps(items), **kwargs)
  43. def ts2dt(ts: float, fmt='%Y-%m-%d %H:%M:%S'):
  44. """时间戳转换日期"""
  45. return datetime.datetime.fromtimestamp(ts).strftime(fmt)
  46. def dt2ts(dt: str, fmt='%Y-%m-%d'):
  47. """日期转换时间戳"""
  48. # 转换成时间数组
  49. time_array = time.strptime(dt, fmt)
  50. # 转换成时间戳
  51. timestamp = time.mktime(time_array)
  52. return int(timestamp)
  53. def delta_t(days: int):
  54. """时间差"""
  55. return dt2ts((date.today() - timedelta(days=days)).strftime("%Y-%m-%d"))
  56. def now_date(fmt='%Y-%m-%d %H:%M:%S'):
  57. return datetime.datetime.today().strftime(fmt)
  58. def now_ts():
  59. """当前时间戳"""
  60. return int(datetime.datetime.now().timestamp())
  61. def sha1(text: str):
  62. """
  63. 十六进制数字字符串形式摘要值
  64. @param text: 字符串文本
  65. @return: 摘要值
  66. """
  67. _sha1 = hashlib.sha1()
  68. _sha1.update(text.encode("utf-8"))
  69. return _sha1.hexdigest()
  70. def get_md5(*args):
  71. """
  72. @summary: 获取唯一的32位md5
  73. ---------
  74. @param args: 参与联合去重的值
  75. ---------
  76. @result: 7c8684bcbdfcea6697650aa53d7b1405
  77. """
  78. m = hashlib.md5()
  79. for arg in args:
  80. m.update(str(arg).encode())
  81. return m.hexdigest()
  82. def get_localhost_ip():
  83. """
  84. 利用 UDP 协议来实现的,生成一个UDP包,把自己的 IP 放如到 UDP 协议头中,然后从UDP包中获取本机的IP。
  85. 这个方法并不会真实的向外部发包,所以用抓包工具是看不到的
  86. :return:
  87. """
  88. s = None
  89. try:
  90. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  91. s.connect(("8.8.8.8", 80))
  92. ip = s.getsockname()[0]
  93. finally:
  94. if s:
  95. s.close()
  96. return ip