123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- import datetime
- import hashlib
- import json
- import socket
- import time
- from datetime import date, timedelta
- from json import JSONEncoder
- from bson import ObjectId
- def delay(wait):
- time.sleep(wait)
- def json_loads(s, **kwargs):
- """
- json 文档反序列化为 Python 对象
- @param str s: json 字符串
- @param kwargs: 字段和指定的特殊类型映射
- For example:
- > s = {'oid': '123123123'} # 把 oid 的 '123456' 转换成 ObjectId('123456')
- > import bson
- > result = json_loads(s, dzr_id=bson.ObjectId)
- """
- if not s:
- return None
- result = json.loads(s)
- for key, impl in kwargs.items():
- result[key] = impl(result[key])
- return result
- def json_dumps(obj: dict):
- """Python 对象序列化为 json 文档"""
- class JsonEncoder(JSONEncoder):
- def default(self, o):
- if isinstance(o, ObjectId):
- return str(o)
- return super().default(o)
- return json.dumps(obj, ensure_ascii=False, cls=JsonEncoder)
- def document2dict(items: dict):
- """mongo 文档转成 python dict"""
- return json_loads(json_dumps(items))
- def dict2document(items: dict, **kwargs):
- """python dict 转成 mongo 文档"""
- if len(kwargs) == 0:
- kwargs['_id'] = ObjectId
- return json_loads(json_dumps(items), **kwargs)
- def ts2dt(ts: float, fmt='%Y-%m-%d %H:%M:%S'):
- """时间戳转换日期"""
- return datetime.datetime.fromtimestamp(ts).strftime(fmt)
- def dt2ts(dt: str, fmt='%Y-%m-%d'):
- """日期转换时间戳"""
- # 转换成时间数组
- time_array = time.strptime(dt, fmt)
- # 转换成时间戳
- timestamp = time.mktime(time_array)
- return int(timestamp)
- def delta_t(days: int):
- """时间差"""
- return dt2ts((date.today() - timedelta(days=days)).strftime("%Y-%m-%d"))
- def now_date(fmt='%Y-%m-%d %H:%M:%S'):
- return datetime.datetime.today().strftime(fmt)
- def now_ts():
- """当前时间戳"""
- return int(datetime.datetime.now().timestamp())
- def sha1(text: str):
- """
- 十六进制数字字符串形式摘要值
- @param text: 字符串文本
- @return: 摘要值
- """
- _sha1 = hashlib.sha1()
- _sha1.update(text.encode("utf-8"))
- return _sha1.hexdigest()
- def get_md5(*args):
- """
- @summary: 获取唯一的32位md5
- ---------
- @param args: 参与联合去重的值
- ---------
- @result: 7c8684bcbdfcea6697650aa53d7b1405
- """
- m = hashlib.md5()
- for arg in args:
- m.update(str(arg).encode())
- return m.hexdigest()
- def get_localhost_ip():
- """
- 利用 UDP 协议来实现的,生成一个UDP包,把自己的 IP 放如到 UDP 协议头中,然后从UDP包中获取本机的IP。
- 这个方法并不会真实的向外部发包,所以用抓包工具是看不到的
- :return:
- """
- s = None
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(("8.8.8.8", 80))
- ip = s.getsockname()[0]
- finally:
- if s:
- s.close()
- return ip
|