import datetime import hashlib import json import socket import time from datetime import date, timedelta from json import JSONEncoder from bson import ObjectId def delay(wait): time.sleep(wait) def json_loads(s, **kwargs): """ json 文档反序列化为 Python 对象 @param str s: json 字符串 @param kwargs: 字段和指定的特殊类型映射 For example: > s = {'oid': '123123123'} # 把 oid 的 '123456' 转换成 ObjectId('123456') > import bson > result = json_loads(s, dzr_id=bson.ObjectId) """ if not s: return None result = json.loads(s) for key, impl in kwargs.items(): result[key] = impl(result[key]) return result def json_dumps(obj: dict): """Python 对象序列化为 json 文档""" class JsonEncoder(JSONEncoder): def default(self, o): if isinstance(o, ObjectId): return str(o) return super().default(o) return json.dumps(obj, ensure_ascii=False, cls=JsonEncoder) def document2dict(items: dict): """mongo 文档转成 python dict""" return json_loads(json_dumps(items)) def dict2document(items: dict, **kwargs): """python dict 转成 mongo 文档""" if len(kwargs) == 0: kwargs['_id'] = ObjectId return json_loads(json_dumps(items), **kwargs) def ts2dt(ts: float, fmt='%Y-%m-%d %H:%M:%S'): """时间戳转换日期""" return datetime.datetime.fromtimestamp(ts).strftime(fmt) def dt2ts(dt: str, fmt='%Y-%m-%d'): """日期转换时间戳""" # 转换成时间数组 time_array = time.strptime(dt, fmt) # 转换成时间戳 timestamp = time.mktime(time_array) return int(timestamp) def delta_t(days: int): """时间差""" return dt2ts((date.today() - timedelta(days=days)).strftime("%Y-%m-%d")) def now_date(fmt='%Y-%m-%d %H:%M:%S'): return datetime.datetime.today().strftime(fmt) def now_ts(): """当前时间戳""" return int(datetime.datetime.now().timestamp()) def sha1(text: str): """ 十六进制数字字符串形式摘要值 @param text: 字符串文本 @return: 摘要值 """ _sha1 = hashlib.sha1() _sha1.update(text.encode("utf-8")) return _sha1.hexdigest() def get_md5(*args): """ @summary: 获取唯一的32位md5 --------- @param args: 参与联合去重的值 --------- @result: 7c8684bcbdfcea6697650aa53d7b1405 """ m = hashlib.md5() for arg in args: m.update(str(arg).encode()) return m.hexdigest() def get_localhost_ip(): """ 利用 UDP 协议来实现的,生成一个UDP包,把自己的 IP 放如到 UDP 协议头中,然后从UDP包中获取本机的IP。 这个方法并不会真实的向外部发包,所以用抓包工具是看不到的 :return: """ s = None try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] finally: if s: s.close() return ip