import re from html import unescape from urllib.parse import urlencode, urljoin from bs4 import BeautifulSoup from lxml.html import etree, HtmlElement, fromstring, tostring from urllib3 import get_host from common.log import logger from crawler.defaults import ( USELESS_TAG, USELESS_ATTR, TAGS_CAN_BE_REMOVE_IF_EMPTY, KEYWORDS, ) def err_details(worker): worker_exception = worker.exception() if worker_exception: logger.exception("Worker return exception: {}".format(worker_exception)) return worker def extract_host(url): """ # >>> base_url = extract_host('http://192.168.3.207:8080/') """ _s, _h, _p = get_host(url) return f"{_s}://{_h}/" if _p is None else f"{_s}://{_h}:{_p}/" def split_domain(val: str): if re.match(r'\d+', val) is None: return re.split(r'[\\.:]', val) return [val] def extract_domain(url): """ # >>> base_url = extract_domain('http://192.168.3.207:8080/') """ _, host, port = get_host(url) return f"{host}" if port is None else f"{host}:{port}" def extract_page_title(source): element = html2element(source) nodes = element.xpath('/html/head/title/text()|//title/text()') if len(nodes) > 1: return "".join(";".join(nodes).split()) return "".join("".join(nodes).split()) def is_url(url): """判断url格式畸形与否""" _regex = re.compile( r'^(?:http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) return re.match(_regex, url) is not None def is_domain(domain): _regex = re.compile( r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?', re.IGNORECASE) return re.match(_regex, domain) is not None def label_split(val): # '~`!#$%^&*()_+-=|\';"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\[ ' result = re.split(r'[- _,,\\.|-「」【】??!!/、] *', val) result = [v for v in result if len(v) > 0] return result def get_url(url: str, parameters: dict): """ 拼接url与所带参数 :param url: 链接 :param parameters: 参数 :return: 拼接后的url """ _data = '?' + urlencode(parameters) return urljoin(url, _data) def clean_html(source: str): html_str = re.sub(r'', '', source) html_str = re.sub(r'|]*>|', '', html_str) html_str = re.sub(r'[\s\S]*?', '', html_str) html_str = re.sub(r']*>[\s\S]*?|', '', html_str) html_str = re.sub(r']*>[\s\S]*?', '', html_str) html_str = re.sub(r']*>[\s\S]*?', '', html_str) html_str = re.sub(r']*>', '', html_str) return html_str def extract_text(source: str): soup = BeautifulSoup(source, "lxml") return soup.get_text() def verify_text(val: str, length=50): """检查数字、字母、中文的个数""" if val is None: return False sub_pattern = ['<[^>]+>', '[^0-9a-zA-Z\u4e00-\u9fa5]+'] for pattern in sub_pattern: val = re.sub(pattern, '', val) # 若文本长度小于指定文本长度(length),表示页面内容无详情内容 if len(val) < length: '''无效文本''' return False '''有效文本''' return True def element2html(element: HtmlElement) -> str: return unescape(tostring(element, encoding="utf-8").decode()) def html2element(source: str, base_url=None) -> HtmlElement: html_str = re.sub('\ufeff|\xa0|\u3000|\x00', '', source) html_str = re.sub('', '', html_str) # 清除注释 html_str = re.sub(r']*>[\s\S]*?', '', html_str) # 清除样式 html_str = re.sub(r']*>[\s\S]*?', '', html_str) # 清除js html_str = re.sub('', '', html_str) html_str = re.sub(r'<\?xml.*?>', '', html_str) html_str = re.sub(r'<[!]DOCTYPE.*?>', '', html_str) return fromstring(html_str, base_url=base_url) def iter_node(element: HtmlElement, depth=1): yield element, depth depth += 1 for sub_element in element: if isinstance(sub_element, HtmlElement): yield from iter_node(sub_element, depth) # print('退出', depth) def remove_node(node: HtmlElement): """ this is a in-place operation, not necessary to return :param node: :return: """ parent = node.getparent() if parent is not None: node.drop_tree() # parent.remove(node) def drop_tag(node: HtmlElement): """ only delete the tag, but merge its text to parent. :param node: :return: """ parent = node.getparent() if parent is not None: node.drop_tag() def is_empty_element(node: HtmlElement): return not node.getchildren() and not node.text def normalize_node(element: HtmlElement): etree.strip_elements(element, *USELESS_TAG, with_tail=False) # 节点预处理,删除节点与更新节点的操作在同一循环发生时,更新节点的操作不会生效,原因:? # 空节点合并、噪声节点剔除 for node, _ in iter_node(element): if node.tag.lower() in TAGS_CAN_BE_REMOVE_IF_EMPTY and is_empty_element(node): remove_node(node) if node.tag.lower() == 'p': etree.strip_tags(node, 'span') etree.strip_tags(node, 'strong') # if a div tag does not contain any sub node, it could be converted to p node. if node.tag.lower() == 'div' and not node.getchildren(): node.tag = 'p' if node.tag.lower() == 'span' and not node.getchildren(): node.tag = 'p' # remove empty p tag if node.tag.lower() == 'p' and not node.xpath('.//img'): if not (node.text and node.text.strip()): drop_tag(node) # Delete inline styles style = node.get('style') if style: del node.attrib['style'] # 删除包含干扰属性的节点(完全匹配) for node, _ in iter_node(element): attr = (node.get('id') or node.get('class')) if attr: if attr.lower() in USELESS_ATTR: remove_node(node) break # # 删除无效节点(模糊匹配) # for node, _ in iter_node(element): # attrib = (node.get('id') or node.get('class')) # if attrib: # for attr in USELESS_ATTR: # if re.match(attr, attrib.lower()) is not None: # remove_node(node) # break def pre_parse(element): normalize_node(element) return element def is_title(val: str): """检查数字、字母、中文的个数""" for keyword in KEYWORDS: search = re.search(keyword, val) if search is not None: return True return False