import re from urllib.parse import urlencode, urljoin from urllib3 import get_host from common.log import logger from common.tools import html2element def err_details(worker): worker_exception = worker.exception() if worker_exception: logger.exception("Worker return exception: {}".format(worker_exception)) return worker def extract_host(url): """ # >>> base_url = extract_host('http://192.168.3.207:8080/') """ _s, _h, _p = get_host(url) return f"{_s}://{_h}/" if _p is None else f"{_s}://{_h}:{_p}/" def split_domain(val: str): if re.match(r'\d+', val) is None: return re.split(r'[\\.:]', val) return [val] def extract_domain(url): """ # >>> base_url = extract_domain('http://192.168.3.207:8080/') """ _, host, port = get_host(url) return f"{host}" if port is None else f"{host}:{port}" def extract_page_title(html): element = html2element(html) nodes = element.xpath('/html/head/title/text()|//title/text()') if len(nodes) > 1: return "".join(";".join(nodes).split()) return "".join("".join(nodes).split()) def is_url(url): """判断url格式畸形与否""" _regex = re.compile( r'^(?:http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) return re.match(_regex, url) is not None def is_domain(domain): _regex = re.compile( r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?', re.IGNORECASE) return re.match(_regex, domain) is not None def label_split(val): # '~`!#$%^&*()_+-=|\';"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\[ ' result = re.split(r'[- _,,\\.|-「」【】??!!/、] *', val) result = [v for v in result if len(v) > 0] return result def get_url(url: str, parameters: dict): """ 拼接url与所带参数 :param url: 链接 :param parameters: 参数 :return: 拼接后的url """ _data = '?' + urlencode(parameters) return urljoin(url, _data)