import datetime import hashlib import re import time from collections import namedtuple from urllib.parse import urlparse, urljoin from lxml.html import HtmlElement, fromstring, tostring SearchText = namedtuple('SearchText', ['total']) def element2html(element: HtmlElement) -> str: return tostring(element, encoding="utf-8").decode() def html2element(html: str) -> HtmlElement: return fromstring(html) def valid_element(node: HtmlElement, feature: str): if len(node.xpath(feature)) > 0: return True else: return False def remove_node(node: HtmlElement): """ this is a in-place operation, not necessary to return :param node: :return: """ parent = node.getparent() if parent is not None: parent.remove(node) def text_search(text: str) -> SearchText: """ 中文检索 :param text: 文本 :return: 中文数量 """ if not text: return SearchText(0) results = re.findall('[\u4e00-\u9fa5]', text, re.S) # 列表长度即是中文的字数 return SearchText(len(results)) def verify_text(val: str): """检查数字、字母、中文的个数""" if val is None: return False sub_pattern = ['<[^>]+>', '[^0-9a-zA-Z\u4e00-\u9fa5]+'] for pattern in sub_pattern: val = re.sub(pattern, '', val) # 若文本长度小于50,表示页面内容无详情内容 if len(val) < 50: '''无效文本''' return False '''有效文本''' return True def sha1(text: str): """ 十六进制数字字符串形式摘要值 @param text: 字符串文本 @return: 摘要值 """ _sha1 = hashlib.sha1() _sha1.update(text.encode("utf-8")) return _sha1.hexdigest() def get_ms() -> int: return int(round(time.time() * 1000)) def get_current_date(): return datetime.datetime.now().strftime("%Y-%m-%d") def ms2date(ms: int, fmt="%Y-%m-%d %H:%M:%S"): """毫秒转日期""" timestamp = float(ms / 1000) time_array = time.localtime(timestamp) return time.strftime(fmt, time_array) def convert2type(ts_str): """字符串类型时间戳转成整型""" return int(float(ts_str) / 1000) def ts2date(ts_str, fmt="%Y-%m-%d %H:%M:%S") -> str: """ 时间戳转成日期 :param ts_str: 毫秒级时间戳 :param fmt: 日期格式 :return: 日期 """ timestamp = int(float(ts_str) / 1000) time_array = time.localtime(timestamp) return time.strftime(fmt, time_array) def date2ts(date_str: str, fmt="%Y-%m-%d"): """日期转成时间戳""" time_array = time.strptime(date_str, fmt) timestamp = int(time.mktime(time_array)) return timestamp def delay_by_hour(hour, fmt="%Y-%m-%d %H:%M:%S"): """按小时延时""" _hour = int(hour) _current_now = datetime.datetime.now() return (_current_now + datetime.timedelta(hours=_hour)).strftime(fmt) def delay_by_minutes(minutes, fmt="%Y-%m-%d %H:%M:%S"): """按分钟延时""" _minutes = int(minutes) _current_now = datetime.datetime.now() return (_current_now + datetime.timedelta(minutes=_minutes)).strftime(fmt) def delay_by_day(days, fmt="%Y-%m-%d %H:%M:%S"): """按天延时""" _days = int(days) _current_now = datetime.datetime.now() return (_current_now + datetime.timedelta(days=_days)).strftime(fmt) def compliance_href(href: str): if href in [None, ''] or re.match("^((https|http|ftp|rtsp|mms)?://|/\w+\\?)", href) is None: return False return True def is_href(href: str): result = urlparse(href) if all([len(result.scheme) == 0 and len(result.netloc) == 0]): return False return True def join_url(base, url): return urljoin(base, url)