123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import re
- from urllib.parse import urlencode, urljoin
- from bs4 import BeautifulSoup
- from lxml.html import HtmlElement, fromstring, tostring
- from urllib3 import get_host
- from common.log import logger
- def err_details(worker):
- worker_exception = worker.exception()
- if worker_exception:
- logger.exception("Worker return exception: {}".format(worker_exception))
- return worker
- def extract_host(url):
- """
- # >>> base_url = extract_host('http://192.168.3.207:8080/')
- """
- _s, _h, _p = get_host(url)
- return f"{_s}://{_h}/" if _p is None else f"{_s}://{_h}:{_p}/"
- def split_domain(val: str):
- if re.match(r'\d+', val) is None:
- return re.split(r'[\\.:]', val)
- return [val]
- def extract_domain(url):
- """
- # >>> base_url = extract_domain('http://192.168.3.207:8080/')
- """
- _, host, port = get_host(url)
- return f"{host}" if port is None else f"{host}:{port}"
- def extract_page_title(html):
- element = html2element(html)
- nodes = element.xpath('/html/head/title/text()|//title/text()')
- if len(nodes) > 1:
- return "".join(";".join(nodes).split())
- return "".join("".join(nodes).split())
- def is_url(url):
- """判断url格式畸形与否"""
- _regex = re.compile(
- r'^(?:http|ftp)s?://' # http:// or https://
- r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
- r'localhost|' # localhost...
- r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
- r'(?::\d+)?' # optional port
- r'(?:/?|[/?]\S+)$', re.IGNORECASE)
- return re.match(_regex, url) is not None
- def is_domain(domain):
- _regex = re.compile(
- r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
- r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
- r'(?::\d+)?', re.IGNORECASE)
- return re.match(_regex, domain) is not None
- def label_split(val):
- # '~`!#$%^&*()_+-=|\';"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\[ '
- result = re.split(r'[- _,,\\.|-「」【】??!!/、] *', val)
- result = [v for v in result if len(v) > 0]
- return result
- def get_url(url: str, parameters: dict):
- """
- 拼接url与所带参数
- :param url: 链接
- :param parameters: 参数
- :return: 拼接后的url
- """
- _data = '?' + urlencode(parameters)
- return urljoin(url, _data)
- def iter_node(element: HtmlElement, depth=1):
- yield element, depth
- depth += 1
- for sub_element in element:
- if isinstance(sub_element, HtmlElement):
- yield from iter_node(sub_element, depth)
- def element2html(element: HtmlElement) -> str:
- return tostring(element, encoding="utf-8").decode()
- def html2element(html_str: str, base_url=None) -> HtmlElement:
- html_str = re.sub('\ufeff|\xa0|\u3000|\x00', '', html_str)
- html_str = re.sub('</?br.*?>', '', html_str)
- html_str = re.sub(r'<\?xml.*?>', '', html_str)
- html_str = re.sub(r'<[!]DOCTYPE.*?>', '', html_str)
- return fromstring(html_str, base_url=base_url)
- def valid_element(node: HtmlElement, feature: str):
- if len(node.xpath(feature)) > 0:
- return True
- else:
- return False
- def remove_node(node: HtmlElement):
- """
- this is a in-place operation, not necessary to return
- :param node:
- :return:
- """
- parent = node.getparent()
- if parent is not None:
- parent.remove(node)
- def drop_tag(node: HtmlElement):
- """
- only delete the tag, but merge its text to parent.
- :param node:
- :return:
- """
- parent = node.getparent()
- if parent is not None:
- node.drop_tag()
- def clean_html(html_str: str):
- html_str = re.sub(r'<!--[\s\S]*?-->', '', html_str)
- html_str = re.sub(r'<html>|<html [^>]*>|</html>', '', html_str)
- html_str = re.sub(r'<head>[\s\S]*?</head>', '', html_str)
- html_str = re.sub(r'<script[^<>]*>[\s\S]*?</script>|</script>', '', html_str)
- html_str = re.sub(r'<link[^<>]*>[\s\S]*?', '', html_str)
- html_str = re.sub(r'<style[^<>]*>[\s\S]*?</style>', '', html_str)
- html_str = re.sub(r'<img[^>]*>', '', html_str)
- return html_str
- def extract_text(html_str: str):
- soup = BeautifulSoup(html_str, "lxml")
- return soup.get_text()
- def verify_text(val: str, length=50):
- """检查数字、字母、中文的个数"""
- if val is None:
- return False
- sub_pattern = ['<[^>]+>', '[^0-9a-zA-Z\u4e00-\u9fa5]+']
- for pattern in sub_pattern:
- val = re.sub(pattern, '', val)
- # 若文本长度小于指定文本长度(length),表示页面内容无详情内容
- if len(val) < length:
- '''无效文本'''
- return False
- '''有效文本'''
- return True
|