import re from urllib.parse import urlencode, urljoin from bs4 import BeautifulSoup from lxml.html import HtmlElement, fromstring, tostring from urllib3 import get_host from common.log import logger def err_details(worker): worker_exception = worker.exception() if worker_exception: logger.exception("Worker return exception: {}".format(worker_exception)) return worker def extract_host(url): """ # >>> base_url = extract_host('http://192.168.3.207:8080/') """ _s, _h, _p = get_host(url) return f"{_s}://{_h}/" if _p is None else f"{_s}://{_h}:{_p}/" def split_domain(val: str): if re.match(r'\d+', val) is None: return re.split(r'[\\.:]', val) return [val] def extract_domain(url): """ # >>> base_url = extract_domain('http://192.168.3.207:8080/') """ _, host, port = get_host(url) return f"{host}" if port is None else f"{host}:{port}" def extract_page_title(html): element = html2element(html) nodes = element.xpath('/html/head/title/text()|//title/text()') if len(nodes) > 1: return "".join(";".join(nodes).split()) return "".join("".join(nodes).split()) def is_url(url): """判断url格式畸形与否""" _regex = re.compile( r'^(?:http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) return re.match(_regex, url) is not None def is_domain(domain): _regex = re.compile( r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?', re.IGNORECASE) return re.match(_regex, domain) is not None def label_split(val): # '~`!#$%^&*()_+-=|\';"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\[ ' result = re.split(r'[- _,,\\.|-「」【】??!!/、] *', val) result = [v for v in result if len(v) > 0] return result def get_url(url: str, parameters: dict): """ 拼接url与所带参数 :param url: 链接 :param parameters: 参数 :return: 拼接后的url """ _data = '?' + urlencode(parameters) return urljoin(url, _data) def iter_node(element: HtmlElement, depth=1): yield element, depth depth += 1 for sub_element in element: if isinstance(sub_element, HtmlElement): yield from iter_node(sub_element, depth) def element2html(element: HtmlElement) -> str: return tostring(element, encoding="utf-8").decode() def html2element(html_str: str, base_url=None) -> HtmlElement: html_str = re.sub('\ufeff|\xa0|\u3000|\x00', '', html_str) html_str = re.sub('', '', html_str) html_str = re.sub(r'<\?xml.*?>', '', html_str) html_str = re.sub(r'<[!]DOCTYPE.*?>', '', html_str) return fromstring(html_str, base_url=base_url) def valid_element(node: HtmlElement, feature: str): if len(node.xpath(feature)) > 0: return True else: return False def remove_node(node: HtmlElement): """ this is a in-place operation, not necessary to return :param node: :return: """ parent = node.getparent() if parent is not None: parent.remove(node) def drop_tag(node: HtmlElement): """ only delete the tag, but merge its text to parent. :param node: :return: """ parent = node.getparent() if parent is not None: node.drop_tag() def clean_html(html_str: str): html_str = re.sub(r'', '', html_str) html_str = re.sub(r'|]*>|', '', html_str) html_str = re.sub(r'[\s\S]*?', '', html_str) html_str = re.sub(r']*>[\s\S]*?|', '', html_str) html_str = re.sub(r']*>[\s\S]*?', '', html_str) html_str = re.sub(r']*>[\s\S]*?', '', html_str) html_str = re.sub(r']*>', '', html_str) return html_str def extract_text(html_str: str): soup = BeautifulSoup(html_str, "lxml") return soup.get_text() def verify_text(val: str, length=50): """检查数字、字母、中文的个数""" if val is None: return False sub_pattern = ['<[^>]+>', '[^0-9a-zA-Z\u4e00-\u9fa5]+'] for pattern in sub_pattern: val = re.sub(pattern, '', val) # 若文本长度小于指定文本长度(length),表示页面内容无详情内容 if len(val) < length: '''无效文本''' return False '''有效文本''' return True