|
@@ -0,0 +1,156 @@
|
|
|
+import datetime
|
|
|
+import hashlib
|
|
|
+import re
|
|
|
+import time
|
|
|
+from collections import namedtuple
|
|
|
+from urllib.parse import urlparse, urljoin
|
|
|
+
|
|
|
+from lxml.html import HtmlElement, fromstring, tostring
|
|
|
+
|
|
|
+SearchText = namedtuple('SearchText', ['total'])
|
|
|
+
|
|
|
+
|
|
|
+def element2html(element: HtmlElement) -> str:
|
|
|
+ return tostring(element, encoding="utf-8").decode()
|
|
|
+
|
|
|
+
|
|
|
+def html2element(html: str) -> HtmlElement:
|
|
|
+ return fromstring(html)
|
|
|
+
|
|
|
+
|
|
|
+def valid_element(node: HtmlElement, feature: str):
|
|
|
+ if len(node.xpath(feature)) > 0:
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+def remove_node(node: HtmlElement):
|
|
|
+ """
|
|
|
+ this is a in-place operation, not necessary to return
|
|
|
+ :param node:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ parent = node.getparent()
|
|
|
+ if parent is not None:
|
|
|
+ parent.remove(node)
|
|
|
+
|
|
|
+
|
|
|
+def text_search(text: str) -> SearchText:
|
|
|
+ """
|
|
|
+ 中文检索
|
|
|
+
|
|
|
+ :param text: 文本
|
|
|
+ :return: 中文数量
|
|
|
+ """
|
|
|
+ if not text:
|
|
|
+ return SearchText(0)
|
|
|
+
|
|
|
+ results = re.findall('[\u4e00-\u9fa5]', text, re.S)
|
|
|
+ # 列表长度即是中文的字数
|
|
|
+ return SearchText(len(results))
|
|
|
+
|
|
|
+
|
|
|
+def verify_text(val: str):
|
|
|
+ """检查数字、字母、中文的个数"""
|
|
|
+ if val is None:
|
|
|
+ return False
|
|
|
+ sub_pattern = ['<[^>]+>', '[^0-9a-zA-Z\u4e00-\u9fa5]+']
|
|
|
+ for pattern in sub_pattern:
|
|
|
+ val = re.sub(pattern, '', val)
|
|
|
+ # 若文本长度小于50,表示页面内容无详情内容
|
|
|
+ if len(val) < 50:
|
|
|
+ '''无效文本'''
|
|
|
+ return False
|
|
|
+ '''有效文本'''
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+def sha1(text: str):
|
|
|
+ """
|
|
|
+ 十六进制数字字符串形式摘要值
|
|
|
+
|
|
|
+ @param text: 字符串文本
|
|
|
+ @return: 摘要值
|
|
|
+ """
|
|
|
+ _sha1 = hashlib.sha1()
|
|
|
+ _sha1.update(text.encode("utf-8"))
|
|
|
+ return _sha1.hexdigest()
|
|
|
+
|
|
|
+
|
|
|
+def get_ms() -> int:
|
|
|
+ return int(round(time.time() * 1000))
|
|
|
+
|
|
|
+
|
|
|
+def get_current_date():
|
|
|
+ return datetime.datetime.now().strftime("%Y-%m-%d")
|
|
|
+
|
|
|
+
|
|
|
+def ms2date(ms: int, fmt="%Y-%m-%d %H:%M:%S"):
|
|
|
+ """毫秒转日期"""
|
|
|
+ timestamp = float(ms / 1000)
|
|
|
+ time_array = time.localtime(timestamp)
|
|
|
+ return time.strftime(fmt, time_array)
|
|
|
+
|
|
|
+
|
|
|
+def convert2type(ts_str):
|
|
|
+ """字符串类型时间戳转成整型"""
|
|
|
+ return int(float(ts_str) / 1000)
|
|
|
+
|
|
|
+
|
|
|
+def ts2date(ts_str, fmt="%Y-%m-%d %H:%M:%S") -> str:
|
|
|
+ """
|
|
|
+ 时间戳转成日期
|
|
|
+
|
|
|
+ :param ts_str: 毫秒级时间戳
|
|
|
+ :param fmt: 日期格式
|
|
|
+ :return: 日期
|
|
|
+ """
|
|
|
+ timestamp = int(float(ts_str) / 1000)
|
|
|
+ time_array = time.localtime(timestamp)
|
|
|
+ return time.strftime(fmt, time_array)
|
|
|
+
|
|
|
+
|
|
|
+def date2ts(date_str: str, fmt="%Y-%m-%d"):
|
|
|
+ """日期转成时间戳"""
|
|
|
+ time_array = time.strptime(date_str, fmt)
|
|
|
+ timestamp = int(time.mktime(time_array))
|
|
|
+ return timestamp
|
|
|
+
|
|
|
+
|
|
|
+def delay_by_hour(hour, fmt="%Y-%m-%d %H:%M:%S"):
|
|
|
+ """按小时延时"""
|
|
|
+ _hour = int(hour)
|
|
|
+ _current_now = datetime.datetime.now()
|
|
|
+ return (_current_now + datetime.timedelta(hours=_hour)).strftime(fmt)
|
|
|
+
|
|
|
+
|
|
|
+def delay_by_minutes(minutes, fmt="%Y-%m-%d %H:%M:%S"):
|
|
|
+ """按分钟延时"""
|
|
|
+ _minutes = int(minutes)
|
|
|
+ _current_now = datetime.datetime.now()
|
|
|
+ return (_current_now + datetime.timedelta(minutes=_minutes)).strftime(fmt)
|
|
|
+
|
|
|
+
|
|
|
+def delay_by_day(days, fmt="%Y-%m-%d %H:%M:%S"):
|
|
|
+ """按天延时"""
|
|
|
+ _days = int(days)
|
|
|
+ _current_now = datetime.datetime.now()
|
|
|
+ return (_current_now + datetime.timedelta(days=_days)).strftime(fmt)
|
|
|
+
|
|
|
+
|
|
|
+def compliance_href(href: str):
|
|
|
+ if href in [None, ''] or re.match("^((https|http|ftp|rtsp|mms)?://|/\w+\\?)", href) is None:
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+def is_href(href: str):
|
|
|
+ result = urlparse(href)
|
|
|
+ if all([len(result.scheme) == 0 and len(result.netloc) == 0]):
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+def join_url(base, url):
|
|
|
+ return urljoin(base, url)
|